001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.net.URI;
022import java.net.URISyntaxException;
023import java.util.Collections;
024import java.util.Iterator;
025import org.apache.commons.lang3.StringUtils;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.conf.Configured;
028import org.apache.hadoop.fs.FileStatus;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.CellBuilderFactory;
033import org.apache.hadoop.hbase.CellBuilderType;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Delete;
041import org.apache.hadoop.hbase.client.Mutation;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.ResultScanner;
045import org.apache.hadoop.hbase.client.Scan;
046import org.apache.hadoop.hbase.client.Table;
047import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.mapreduce.Counters;
051import org.apache.hadoop.mapreduce.Job;
052import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
053import org.apache.hadoop.mapreduce.security.TokenCache;
054import org.apache.hadoop.util.GenericOptionsParser;
055import org.apache.hadoop.util.Tool;
056import org.apache.hadoop.util.ToolRunner;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
062
063@InterfaceAudience.Private
064public class SyncTable extends Configured implements Tool {
065
066  private static final Logger LOG = LoggerFactory.getLogger(SyncTable.class);
067
068  static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
069  static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
070  static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
071  static final String SOURCE_URI_CONF_KEY = "sync.table.source.uri";
072  /**
073   * @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #SOURCE_URI_CONF_KEY} instead.
074   */
075  @Deprecated
076  static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
077  static final String TARGET_URI_CONF_KEY = "sync.table.target.uri";
078  /**
079   * @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #TARGET_URI_CONF_KEY} instead.
080   */
081  @Deprecated
082  static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
083  static final String DRY_RUN_CONF_KEY = "sync.table.dry.run";
084  static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes";
085  static final String DO_PUTS_CONF_KEY = "sync.table.do.puts";
086  static final String IGNORE_TIMESTAMPS = "sync.table.ignore.timestamps";
087
088  Path sourceHashDir;
089  String sourceTableName;
090  String targetTableName;
091
092  URI sourceUri;
093  /**
094   * @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #sourceUri} instead.
095   */
096  @Deprecated
097  String sourceZkCluster;
098  URI targetUri;
099  /**
100   * @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #targetUri} instead.
101   */
102  @Deprecated
103  String targetZkCluster;
104  boolean dryRun;
105  boolean doDeletes = true;
106  boolean doPuts = true;
107  boolean ignoreTimestamps;
108
109  Counters counters;
110
111  public SyncTable(Configuration conf) {
112    super(conf);
113  }
114
115  private void initCredentialsForHBase(String clusterKey, Job job) throws IOException {
116    Configuration peerConf =
117      HBaseConfiguration.createClusterConf(job.getConfiguration(), clusterKey);
118    TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
119  }
120
121  public Job createSubmittableJob(String[] args) throws IOException {
122    FileSystem fs = sourceHashDir.getFileSystem(getConf());
123    if (!fs.exists(sourceHashDir)) {
124      throw new IOException("Source hash dir not found: " + sourceHashDir);
125    }
126
127    Job job = Job.getInstance(getConf(),
128      getConf().get("mapreduce.job.name", "syncTable_" + sourceTableName + "-" + targetTableName));
129    Configuration jobConf = job.getConfiguration();
130    if ("kerberos".equalsIgnoreCase(jobConf.get("hadoop.security.authentication"))) {
131      TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { sourceHashDir },
132        getConf());
133    }
134
135    HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
136    LOG.info("Read source hash manifest: " + tableHash);
137    LOG.info("Read " + tableHash.partitions.size() + " partition keys");
138    if (!tableHash.tableName.equals(sourceTableName)) {
139      LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
140        + tableHash.tableName + " but job is reading from: " + sourceTableName);
141    }
142    if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
143      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
144        + " should be 1 more than the number of partition keys.  However, the manifest file "
145        + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
146        + " found in the partitions file is " + tableHash.partitions.size());
147    }
148
149    Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
150    int dataSubdirCount = 0;
151    for (FileStatus file : fs.listStatus(dataDir)) {
152      if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
153        dataSubdirCount++;
154      }
155    }
156
157    if (dataSubdirCount != tableHash.numHashFiles) {
158      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
159        + " should be 1 more than the number of partition keys.  However, the number of data dirs"
160        + " found is " + dataSubdirCount + " but the number of partition keys"
161        + " found in the partitions file is " + tableHash.partitions.size());
162    }
163
164    job.setJarByClass(HashTable.class);
165    jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
166    jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
167    jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
168    if (sourceUri != null) {
169      jobConf.set(SOURCE_URI_CONF_KEY, sourceUri.toString());
170      TableMapReduceUtil.initCredentialsForCluster(job, jobConf, sourceUri);
171    } else if (sourceZkCluster != null) {
172      jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
173      initCredentialsForHBase(sourceZkCluster, job);
174    }
175    if (targetUri != null) {
176      jobConf.set(TARGET_URI_CONF_KEY, targetUri.toString());
177      TableMapReduceUtil.initCredentialsForCluster(job, jobConf, targetUri);
178    } else if (targetZkCluster != null) {
179      jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
180      initCredentialsForHBase(targetZkCluster, job);
181    }
182    jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
183    jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes);
184    jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts);
185    jobConf.setBoolean(IGNORE_TIMESTAMPS, ignoreTimestamps);
186
187    TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(), SyncMapper.class,
188      null, null, job);
189
190    job.setNumReduceTasks(0);
191
192    if (dryRun) {
193      job.setOutputFormatClass(NullOutputFormat.class);
194    } else {
195      // No reducers. Just write straight to table. Call initTableReducerJob
196      // because it sets up the TableOutputFormat.
197      if (targetUri != null) {
198        TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetUri);
199      } else {
200        TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetZkCluster);
201      }
202      // would be nice to add an option for bulk load instead
203    }
204
205    return job;
206  }
207
208  public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
209    Path sourceHashDir;
210
211    Connection sourceConnection;
212    Connection targetConnection;
213    Table sourceTable;
214    Table targetTable;
215    boolean dryRun;
216    boolean doDeletes = true;
217    boolean doPuts = true;
218    boolean ignoreTimestamp;
219
220    HashTable.TableHash sourceTableHash;
221    HashTable.TableHash.Reader sourceHashReader;
222    ImmutableBytesWritable currentSourceHash;
223    ImmutableBytesWritable nextSourceKey;
224    HashTable.ResultHasher targetHasher;
225
226    Throwable mapperException;
227
228    public static enum Counter {
229      BATCHES,
230      HASHES_MATCHED,
231      HASHES_NOT_MATCHED,
232      SOURCEMISSINGROWS,
233      SOURCEMISSINGCELLS,
234      TARGETMISSINGROWS,
235      TARGETMISSINGCELLS,
236      ROWSWITHDIFFS,
237      DIFFERENTCELLVALUES,
238      MATCHINGROWS,
239      MATCHINGCELLS,
240      EMPTY_BATCHES,
241      RANGESMATCHED,
242      RANGESNOTMATCHED
243    }
244
245    @Override
246    protected void setup(Context context) throws IOException {
247      Configuration conf = context.getConfiguration();
248      sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
249      sourceConnection =
250        openConnection(conf, SOURCE_URI_CONF_KEY, SOURCE_ZK_CLUSTER_CONF_KEY, null);
251      targetConnection = openConnection(conf, TARGET_URI_CONF_KEY, TARGET_ZK_CLUSTER_CONF_KEY,
252        TableOutputFormat.OUTPUT_CONF_PREFIX);
253      sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
254      targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
255      dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
256      doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true);
257      doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true);
258      ignoreTimestamp = conf.getBoolean(IGNORE_TIMESTAMPS, false);
259
260      sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
261      LOG.info("Read source hash manifest: " + sourceTableHash);
262      LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
263
264      TableSplit split = (TableSplit) context.getInputSplit();
265      ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
266
267      sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
268      findNextKeyHashPair();
269
270      // create a hasher, but don't start it right away
271      // instead, find the first hash batch at or after the start row
272      // and skip any rows that come before. they will be caught by the previous task
273      targetHasher = new HashTable.ResultHasher();
274      targetHasher.ignoreTimestamps = ignoreTimestamp;
275    }
276
277    private static Connection openConnection(Configuration conf, String uriConfKey,
278      String zkClusterConfKey, String configPrefix) throws IOException {
279      String uri = conf.get(uriConfKey);
280      if (!StringUtils.isBlank(uri)) {
281        try {
282          return ConnectionFactory.createConnection(new URI(uri), conf);
283        } catch (URISyntaxException e) {
284          throw new IOException(
285            "malformed connection uri: " + uri + ", please check config " + uriConfKey, e);
286        }
287      } else {
288        String zkCluster = conf.get(zkClusterConfKey);
289        Configuration clusterConf =
290          HBaseConfiguration.createClusterConf(conf, zkCluster, configPrefix);
291        return ConnectionFactory.createConnection(clusterConf);
292      }
293    }
294
295    private static Table openTable(Connection connection, Configuration conf,
296      String tableNameConfKey) throws IOException {
297      return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
298    }
299
300    /**
301     * Attempt to read the next source key/hash pair. If there are no more, set nextSourceKey to
302     * null
303     */
304    private void findNextKeyHashPair() throws IOException {
305      boolean hasNext = sourceHashReader.next();
306      if (hasNext) {
307        nextSourceKey = sourceHashReader.getCurrentKey();
308      } else {
309        // no more keys - last hash goes to the end
310        nextSourceKey = null;
311      }
312    }
313
314    @Override
315    protected void map(ImmutableBytesWritable key, Result value, Context context)
316      throws IOException, InterruptedException {
317      try {
318        // first, finish any hash batches that end before the scanned row
319        while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
320          moveToNextBatch(context);
321        }
322
323        // next, add the scanned row (as long as we've reached the first batch)
324        if (targetHasher.isBatchStarted()) {
325          targetHasher.hashResult(value);
326        }
327      } catch (Throwable t) {
328        mapperException = t;
329        throw t;
330      }
331    }
332
333    /**
334     * If there is an open hash batch, complete it and sync if there are diffs. Start a new batch,
335     * and seek to read the
336     */
337    private void moveToNextBatch(Context context) throws IOException, InterruptedException {
338      if (targetHasher.isBatchStarted()) {
339        finishBatchAndCompareHashes(context);
340      }
341      targetHasher.startBatch(nextSourceKey);
342      currentSourceHash = sourceHashReader.getCurrentHash();
343
344      findNextKeyHashPair();
345    }
346
347    /**
348     * Finish the currently open hash batch. Compare the target hash to the given source hash. If
349     * they do not match, then sync the covered key range.
350     */
351    private void finishBatchAndCompareHashes(Context context)
352      throws IOException, InterruptedException {
353      targetHasher.finishBatch();
354      context.getCounter(Counter.BATCHES).increment(1);
355      if (targetHasher.getBatchSize() == 0) {
356        context.getCounter(Counter.EMPTY_BATCHES).increment(1);
357      }
358      ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
359      if (targetHash.equals(currentSourceHash)) {
360        context.getCounter(Counter.HASHES_MATCHED).increment(1);
361      } else {
362        context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
363
364        ImmutableBytesWritable stopRow = nextSourceKey == null
365          ? new ImmutableBytesWritable(sourceTableHash.stopRow)
366          : nextSourceKey;
367
368        if (LOG.isDebugEnabled()) {
369          LOG.debug("Hash mismatch.  Key range: " + toHex(targetHasher.getBatchStartKey()) + " to "
370            + toHex(stopRow) + " sourceHash: " + toHex(currentSourceHash) + " targetHash: "
371            + toHex(targetHash));
372        }
373
374        syncRange(context, targetHasher.getBatchStartKey(), stopRow);
375      }
376    }
377
378    private static String toHex(ImmutableBytesWritable bytes) {
379      return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
380    }
381
382    private static final CellScanner EMPTY_CELL_SCANNER =
383      new CellScanner(Collections.<Result> emptyIterator());
384
385    /**
386     * Rescan the given range directly from the source and target tables. Count and log differences,
387     * and if this is not a dry run, output Puts and Deletes to make the target table match the
388     * source table for this range
389     */
390    private void syncRange(Context context, ImmutableBytesWritable startRow,
391      ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
392      Scan scan = sourceTableHash.initScan();
393      scan.withStartRow(startRow.copyBytes());
394      scan.withStopRow(stopRow.copyBytes());
395
396      ResultScanner sourceScanner = sourceTable.getScanner(scan);
397      CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
398
399      ResultScanner targetScanner = targetTable.getScanner(new Scan(scan));
400      CellScanner targetCells = new CellScanner(targetScanner.iterator());
401
402      boolean rangeMatched = true;
403      byte[] nextSourceRow = sourceCells.nextRow();
404      byte[] nextTargetRow = targetCells.nextRow();
405      while (nextSourceRow != null || nextTargetRow != null) {
406        boolean rowMatched;
407        int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
408        if (rowComparison < 0) {
409          if (LOG.isDebugEnabled()) {
410            LOG.debug("Target missing row: " + Bytes.toString(nextSourceRow));
411          }
412          context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
413
414          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
415          nextSourceRow = sourceCells.nextRow(); // advance only source to next row
416        } else if (rowComparison > 0) {
417          if (LOG.isDebugEnabled()) {
418            LOG.debug("Source missing row: " + Bytes.toString(nextTargetRow));
419          }
420          context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
421
422          rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
423          nextTargetRow = targetCells.nextRow(); // advance only target to next row
424        } else {
425          // current row is the same on both sides, compare cell by cell
426          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
427          nextSourceRow = sourceCells.nextRow();
428          nextTargetRow = targetCells.nextRow();
429        }
430
431        if (!rowMatched) {
432          rangeMatched = false;
433        }
434      }
435
436      sourceScanner.close();
437      targetScanner.close();
438
439      context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
440        .increment(1);
441    }
442
443    private static class CellScanner {
444      private final Iterator<Result> results;
445
446      private byte[] currentRow;
447      private Result currentRowResult;
448      private int nextCellInRow;
449
450      private Result nextRowResult;
451
452      public CellScanner(Iterator<Result> results) {
453        this.results = results;
454      }
455
456      /**
457       * Advance to the next row and return its row key. Returns null iff there are no more rows.
458       */
459      public byte[] nextRow() {
460        if (nextRowResult == null) {
461          // no cached row - check scanner for more
462          while (results.hasNext()) {
463            nextRowResult = results.next();
464            Cell nextCell = nextRowResult.rawCells()[0];
465            if (
466              currentRow == null || !Bytes.equals(currentRow, 0, currentRow.length,
467                nextCell.getRowArray(), nextCell.getRowOffset(), nextCell.getRowLength())
468            ) {
469              // found next row
470              break;
471            } else {
472              // found another result from current row, keep scanning
473              nextRowResult = null;
474            }
475          }
476
477          if (nextRowResult == null) {
478            // end of data, no more rows
479            currentRowResult = null;
480            currentRow = null;
481            return null;
482          }
483        }
484
485        // advance to cached result for next row
486        currentRowResult = nextRowResult;
487        nextCellInRow = 0;
488        currentRow = currentRowResult.getRow();
489        nextRowResult = null;
490        return currentRow;
491      }
492
493      /**
494       * Returns the next Cell in the current row or null iff none remain.
495       */
496      public Cell nextCellInRow() {
497        if (currentRowResult == null) {
498          // nothing left in current row
499          return null;
500        }
501
502        Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
503        nextCellInRow++;
504        if (nextCellInRow == currentRowResult.size()) {
505          if (results.hasNext()) {
506            Result result = results.next();
507            Cell cell = result.rawCells()[0];
508            if (
509              Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
510                cell.getRowOffset(), cell.getRowLength())
511            ) {
512              // result is part of current row
513              currentRowResult = result;
514              nextCellInRow = 0;
515            } else {
516              // result is part of next row, cache it
517              nextRowResult = result;
518              // current row is complete
519              currentRowResult = null;
520            }
521          } else {
522            // end of data
523            currentRowResult = null;
524          }
525        }
526        return nextCell;
527      }
528    }
529
530    private Cell checkAndResetTimestamp(Cell sourceCell) {
531      if (ignoreTimestamp) {
532        sourceCell =
533          CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(sourceCell.getType())
534            .setRow(sourceCell.getRowArray(), sourceCell.getRowOffset(), sourceCell.getRowLength())
535            .setFamily(sourceCell.getFamilyArray(), sourceCell.getFamilyOffset(),
536              sourceCell.getFamilyLength())
537            .setQualifier(sourceCell.getQualifierArray(), sourceCell.getQualifierOffset(),
538              sourceCell.getQualifierLength())
539            .setTimestamp(EnvironmentEdgeManager.currentTime()).setValue(sourceCell.getValueArray(),
540              sourceCell.getValueOffset(), sourceCell.getValueLength())
541            .build();
542      }
543      return sourceCell;
544    }
545
546    /**
547     * Compare the cells for the given row from the source and target tables. Count and log any
548     * differences. If not a dry run, output a Put and/or Delete needed to sync the target table to
549     * match the source table.
550     */
551    private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
552      CellScanner targetCells) throws IOException, InterruptedException {
553      Put put = null;
554      Delete delete = null;
555      long matchingCells = 0;
556      boolean matchingRow = true;
557      Cell sourceCell = sourceCells.nextCellInRow();
558      Cell targetCell = targetCells.nextCellInRow();
559      while (sourceCell != null || targetCell != null) {
560
561        int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
562        if (cellKeyComparison < 0) {
563          if (LOG.isDebugEnabled()) {
564            LOG.debug("Target missing cell: " + sourceCell);
565          }
566          context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
567          matchingRow = false;
568
569          if (!dryRun && doPuts) {
570            if (put == null) {
571              put = new Put(rowKey);
572            }
573            sourceCell = checkAndResetTimestamp(sourceCell);
574            put.add(sourceCell);
575          }
576
577          sourceCell = sourceCells.nextCellInRow();
578        } else if (cellKeyComparison > 0) {
579          if (LOG.isDebugEnabled()) {
580            LOG.debug("Source missing cell: " + targetCell);
581          }
582          context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
583          matchingRow = false;
584
585          if (!dryRun && doDeletes) {
586            if (delete == null) {
587              delete = new Delete(rowKey);
588            }
589            // add a tombstone to exactly match the target cell that is missing on the source
590            delete.addColumn(CellUtil.cloneFamily(targetCell), CellUtil.cloneQualifier(targetCell),
591              targetCell.getTimestamp());
592          }
593
594          targetCell = targetCells.nextCellInRow();
595        } else {
596          // the cell keys are equal, now check values
597          if (CellUtil.matchingValue(sourceCell, targetCell)) {
598            matchingCells++;
599          } else {
600            if (LOG.isDebugEnabled()) {
601              LOG.debug("Different values: ");
602              LOG.debug("  source cell: " + sourceCell + " value: "
603                + Bytes.toString(sourceCell.getValueArray(), sourceCell.getValueOffset(),
604                  sourceCell.getValueLength()));
605              LOG.debug("  target cell: " + targetCell + " value: "
606                + Bytes.toString(targetCell.getValueArray(), targetCell.getValueOffset(),
607                  targetCell.getValueLength()));
608            }
609            context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
610            matchingRow = false;
611
612            if (!dryRun && doPuts) {
613              // overwrite target cell
614              if (put == null) {
615                put = new Put(rowKey);
616              }
617              sourceCell = checkAndResetTimestamp(sourceCell);
618              put.add(sourceCell);
619            }
620          }
621          sourceCell = sourceCells.nextCellInRow();
622          targetCell = targetCells.nextCellInRow();
623        }
624
625        if (!dryRun && sourceTableHash.scanBatch > 0) {
626          if (put != null && put.size() >= sourceTableHash.scanBatch) {
627            context.write(new ImmutableBytesWritable(rowKey), put);
628            put = null;
629          }
630          if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
631            context.write(new ImmutableBytesWritable(rowKey), delete);
632            delete = null;
633          }
634        }
635      }
636
637      if (!dryRun) {
638        if (put != null) {
639          context.write(new ImmutableBytesWritable(rowKey), put);
640        }
641        if (delete != null) {
642          context.write(new ImmutableBytesWritable(rowKey), delete);
643        }
644      }
645
646      if (matchingCells > 0) {
647        context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
648      }
649      if (matchingRow) {
650        context.getCounter(Counter.MATCHINGROWS).increment(1);
651        return true;
652      } else {
653        context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
654        return false;
655      }
656    }
657
658    /**
659     * Compare row keys of the given Result objects. Nulls are after non-nulls
660     */
661    private static int compareRowKeys(byte[] r1, byte[] r2) {
662      if (r1 == null) {
663        return 1; // source missing row
664      } else if (r2 == null) {
665        return -1; // target missing row
666      } else {
667        // Sync on no META tables only. We can directly do what CellComparator is doing inside.
668        // Never the call going to MetaCellComparator.
669        return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length);
670      }
671    }
672
673    /**
674     * Compare families, qualifiers, and timestamps of the given Cells. They are assumed to be of
675     * the same row. Nulls are after non-nulls.
676     */
677    private int compareCellKeysWithinRow(Cell c1, Cell c2) {
678      if (c1 == null) {
679        return 1; // source missing cell
680      }
681      if (c2 == null) {
682        return -1; // target missing cell
683      }
684
685      int result = CellComparator.getInstance().compareFamilies(c1, c2);
686      if (result != 0) {
687        return result;
688      }
689
690      result = CellComparator.getInstance().compareQualifiers(c1, c2);
691      if (result != 0) {
692        return result;
693      }
694
695      if (this.ignoreTimestamp) {
696        return 0;
697      } else {
698        // note timestamp comparison is inverted - more recent cells first
699        return CellComparator.getInstance().compareTimestamps(c1, c2);
700      }
701    }
702
703    @Override
704    protected void cleanup(Context context) throws IOException, InterruptedException {
705      if (mapperException == null) {
706        try {
707          finishRemainingHashRanges(context);
708        } catch (Throwable t) {
709          mapperException = t;
710        }
711      }
712
713      try {
714        sourceTable.close();
715        targetTable.close();
716        sourceConnection.close();
717        targetConnection.close();
718      } catch (Throwable t) {
719        if (mapperException == null) {
720          mapperException = t;
721        } else {
722          LOG.error("Suppressing exception from closing tables", t);
723        }
724      }
725
726      // propagate first exception
727      if (mapperException != null) {
728        Throwables.throwIfInstanceOf(mapperException, IOException.class);
729        Throwables.throwIfInstanceOf(mapperException, InterruptedException.class);
730        Throwables.throwIfUnchecked(mapperException);
731      }
732    }
733
734    private void finishRemainingHashRanges(Context context)
735      throws IOException, InterruptedException {
736      TableSplit split = (TableSplit) context.getInputSplit();
737      byte[] splitEndRow = split.getEndRow();
738      boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
739
740      // if there are more hash batches that begin before the end of this split move to them
741      while (
742        nextSourceKey != null && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)
743      ) {
744        moveToNextBatch(context);
745      }
746
747      if (targetHasher.isBatchStarted()) {
748        // need to complete the final open hash batch
749
750        if (
751          (nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
752            || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))
753        ) {
754          // the open hash range continues past the end of this region
755          // add a scan to complete the current hash range
756          Scan scan = sourceTableHash.initScan();
757          scan.withStartRow(splitEndRow);
758          if (nextSourceKey == null) {
759            scan.withStopRow(sourceTableHash.stopRow);
760          } else {
761            scan.withStopRow(nextSourceKey.copyBytes());
762          }
763
764          ResultScanner targetScanner = null;
765          try {
766            targetScanner = targetTable.getScanner(scan);
767            for (Result row : targetScanner) {
768              targetHasher.hashResult(row);
769            }
770          } finally {
771            if (targetScanner != null) {
772              targetScanner.close();
773            }
774          }
775        } // else current batch ends exactly at split end row
776
777        finishBatchAndCompareHashes(context);
778      }
779    }
780  }
781
782  private static final int NUM_ARGS = 3;
783
784  private static void printUsage(final String errorMsg) {
785    if (errorMsg != null && errorMsg.length() > 0) {
786      System.err.println("ERROR: " + errorMsg);
787      System.err.println();
788    }
789    System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
790    System.err.println();
791    System.err.println("Options:");
792
793    System.err.println(" sourceuri        Cluster connection uri of the source table");
794    System.err.println("                  (defaults to cluster in classpath's config)");
795    System.err.println(" sourcezkcluster  ZK cluster key of the source table");
796    System.err.println("                  (defaults to cluster in classpath's config)");
797    System.err.println("                  Do not take effect if sourceuri is specified");
798    System.err.println("                  Deprecated, please use sourceuri instead");
799    System.err.println(" targeturi        Cluster connection uri of the target table");
800    System.err.println("                  (defaults to cluster in classpath's config)");
801    System.err.println(" targetzkcluster  ZK cluster key of the target table");
802    System.err.println("                  (defaults to cluster in classpath's config)");
803    System.err.println("                  Do not take effect if targeturi is specified");
804    System.err.println("                  Deprecated, please use targeturi instead");
805    System.err.println(" dryrun           if true, output counters but no writes");
806    System.err.println("                  (defaults to false)");
807    System.err.println(" doDeletes        if false, does not perform deletes");
808    System.err.println("                  (defaults to true)");
809    System.err.println(" doPuts           if false, does not perform puts");
810    System.err.println("                  (defaults to true)");
811    System.err.println(" ignoreTimestamps if true, ignores cells timestamps while comparing ");
812    System.err.println("                  cell values. Any missing cell on target then gets");
813    System.err.println("                  added with current time as timestamp ");
814    System.err.println("                  (defaults to false)");
815    System.err.println();
816    System.err.println("Args:");
817    System.err.println(" sourcehashdir    path to HashTable output dir for source table");
818    System.err.println("                  (see org.apache.hadoop.hbase.mapreduce.HashTable)");
819    System.err.println(" sourcetable      Name of the source table to sync from");
820    System.err.println(" targettable      Name of the target table to sync to");
821    System.err.println();
822    System.err.println("Examples:");
823    System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
824    System.err.println(" to a local target cluster:");
825    System.err.println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
826      + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
827      + " hdfs://nn:9000/hashes/tableA tableA tableA");
828  }
829
830  private boolean doCommandLine(final String[] args) {
831    if (args.length < NUM_ARGS) {
832      printUsage(null);
833      return false;
834    }
835    try {
836      sourceHashDir = new Path(args[args.length - 3]);
837      sourceTableName = args[args.length - 2];
838      targetTableName = args[args.length - 1];
839
840      for (int i = 0; i < args.length - NUM_ARGS; i++) {
841        String cmd = args[i];
842        if (cmd.equals("-h") || cmd.startsWith("--h")) {
843          printUsage(null);
844          return false;
845        }
846        final String sourceUriKey = "--sourceuri=";
847        if (cmd.startsWith(sourceUriKey)) {
848          sourceUri = new URI(cmd.substring(sourceUriKey.length()));
849          continue;
850        }
851
852        final String sourceZkClusterKey = "--sourcezkcluster=";
853        if (cmd.startsWith(sourceZkClusterKey)) {
854          sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
855          continue;
856        }
857
858        final String targetUriKey = "--targeturi=";
859        if (cmd.startsWith(targetUriKey)) {
860          targetUri = new URI(cmd.substring(targetUriKey.length()));
861          continue;
862        }
863
864        final String targetZkClusterKey = "--targetzkcluster=";
865        if (cmd.startsWith(targetZkClusterKey)) {
866          targetZkCluster = cmd.substring(targetZkClusterKey.length());
867          continue;
868        }
869
870        final String dryRunKey = "--dryrun=";
871        if (cmd.startsWith(dryRunKey)) {
872          dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
873          continue;
874        }
875
876        final String doDeletesKey = "--doDeletes=";
877        if (cmd.startsWith(doDeletesKey)) {
878          doDeletes = Boolean.parseBoolean(cmd.substring(doDeletesKey.length()));
879          continue;
880        }
881
882        final String doPutsKey = "--doPuts=";
883        if (cmd.startsWith(doPutsKey)) {
884          doPuts = Boolean.parseBoolean(cmd.substring(doPutsKey.length()));
885          continue;
886        }
887
888        final String ignoreTimestampsKey = "--ignoreTimestamps=";
889        if (cmd.startsWith(ignoreTimestampsKey)) {
890          ignoreTimestamps = Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length()));
891          continue;
892        }
893
894        printUsage("Invalid argument '" + cmd + "'");
895        return false;
896      }
897
898    } catch (Exception e) {
899      LOG.error("Failed to parse commandLine arguments", e);
900      printUsage("Can't start because " + e.getMessage());
901      return false;
902    }
903    return true;
904  }
905
906  /**
907   * Main entry point.
908   */
909  public static void main(String[] args) throws Exception {
910    int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
911    System.exit(ret);
912  }
913
914  @Override
915  public int run(String[] args) throws Exception {
916    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
917    if (!doCommandLine(otherArgs)) {
918      return 1;
919    }
920
921    Job job = createSubmittableJob(otherArgs);
922    if (!job.waitForCompletion(true)) {
923      LOG.info("Map-reduce job failed!");
924      return 1;
925    }
926    counters = job.getCounters();
927    return 0;
928  }
929
930}