001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.Iterator;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.conf.Configured;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellBuilderFactory;
030import org.apache.hadoop.hbase.CellBuilderType;
031import org.apache.hadoop.hbase.CellComparator;
032import org.apache.hadoop.hbase.CellUtil;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Delete;
038import org.apache.hadoop.hbase.client.Mutation;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.ResultScanner;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.mapreduce.Counters;
047import org.apache.hadoop.mapreduce.Job;
048import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
049import org.apache.hadoop.mapreduce.security.TokenCache;
050import org.apache.hadoop.util.GenericOptionsParser;
051import org.apache.hadoop.util.Tool;
052import org.apache.hadoop.util.ToolRunner;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
058
059@InterfaceAudience.Private
060public class SyncTable extends Configured implements Tool {
061
062  private static final Logger LOG = LoggerFactory.getLogger(SyncTable.class);
063
064  static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
065  static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
066  static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
067  static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
068  static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
069  static final String DRY_RUN_CONF_KEY = "sync.table.dry.run";
070  static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes";
071  static final String DO_PUTS_CONF_KEY = "sync.table.do.puts";
072  static final String IGNORE_TIMESTAMPS = "sync.table.ignore.timestamps";
073
074  Path sourceHashDir;
075  String sourceTableName;
076  String targetTableName;
077
078  String sourceZkCluster;
079  String targetZkCluster;
080  boolean dryRun;
081  boolean doDeletes = true;
082  boolean doPuts = true;
083  boolean ignoreTimestamps;
084
085  Counters counters;
086
087  public SyncTable(Configuration conf) {
088    super(conf);
089  }
090
091  private void initCredentialsForHBase(String zookeeper, Job job) throws IOException {
092    Configuration peerConf = HBaseConfiguration.createClusterConf(job
093            .getConfiguration(), zookeeper);
094    if("kerberos".equalsIgnoreCase(peerConf.get("hbase.security.authentication"))){
095      TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
096    }
097  }
098
099  public Job createSubmittableJob(String[] args) throws IOException {
100    FileSystem fs = sourceHashDir.getFileSystem(getConf());
101    if (!fs.exists(sourceHashDir)) {
102      throw new IOException("Source hash dir not found: " + sourceHashDir);
103    }
104
105    Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
106        "syncTable_" + sourceTableName + "-" + targetTableName));
107    Configuration jobConf = job.getConfiguration();
108    if ("kerberos".equalsIgnoreCase(jobConf.get("hadoop.security.authentication"))) {
109      TokenCache.obtainTokensForNamenodes(job.getCredentials(), new
110          Path[] { sourceHashDir }, getConf());
111    }
112
113    HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
114    LOG.info("Read source hash manifest: " + tableHash);
115    LOG.info("Read " + tableHash.partitions.size() + " partition keys");
116    if (!tableHash.tableName.equals(sourceTableName)) {
117      LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
118          + tableHash.tableName + " but job is reading from: " + sourceTableName);
119    }
120    if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
121      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
122          + " should be 1 more than the number of partition keys.  However, the manifest file "
123          + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
124          + " found in the partitions file is " + tableHash.partitions.size());
125    }
126
127    Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
128    int dataSubdirCount = 0;
129    for (FileStatus file : fs.listStatus(dataDir)) {
130      if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
131        dataSubdirCount++;
132      }
133    }
134
135    if (dataSubdirCount != tableHash.numHashFiles) {
136      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
137          + " should be 1 more than the number of partition keys.  However, the number of data dirs"
138          + " found is " + dataSubdirCount + " but the number of partition keys"
139          + " found in the partitions file is " + tableHash.partitions.size());
140    }
141
142    job.setJarByClass(HashTable.class);
143    jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
144    jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
145    jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
146    if (sourceZkCluster != null) {
147      jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
148      initCredentialsForHBase(sourceZkCluster, job);
149    }
150    if (targetZkCluster != null) {
151      jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
152      initCredentialsForHBase(targetZkCluster, job);
153    }
154    jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
155    jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes);
156    jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts);
157    jobConf.setBoolean(IGNORE_TIMESTAMPS, ignoreTimestamps);
158
159    TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
160        SyncMapper.class, null, null, job);
161
162    job.setNumReduceTasks(0);
163
164    if (dryRun) {
165      job.setOutputFormatClass(NullOutputFormat.class);
166    } else {
167      // No reducers.  Just write straight to table.  Call initTableReducerJob
168      // because it sets up the TableOutputFormat.
169      TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
170          targetZkCluster, null, null);
171
172      // would be nice to add an option for bulk load instead
173    }
174
175    // Obtain an authentication token, for the specified cluster, on behalf of the current user
176    if (sourceZkCluster != null) {
177      Configuration peerConf =
178          HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster);
179      TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
180    }
181    return job;
182  }
183
184  public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
185    Path sourceHashDir;
186
187    Connection sourceConnection;
188    Connection targetConnection;
189    Table sourceTable;
190    Table targetTable;
191    boolean dryRun;
192    boolean doDeletes = true;
193    boolean doPuts = true;
194    boolean ignoreTimestamp;
195
196    HashTable.TableHash sourceTableHash;
197    HashTable.TableHash.Reader sourceHashReader;
198    ImmutableBytesWritable currentSourceHash;
199    ImmutableBytesWritable nextSourceKey;
200    HashTable.ResultHasher targetHasher;
201
202    Throwable mapperException;
203
204    public static enum Counter { BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS,
205      SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES,
206      MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED
207    }
208
209    @Override
210    protected void setup(Context context) throws IOException {
211
212      Configuration conf = context.getConfiguration();
213      sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
214      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
215      targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
216          TableOutputFormat.OUTPUT_CONF_PREFIX);
217      sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
218      targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
219      dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
220      doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true);
221      doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true);
222      ignoreTimestamp = conf.getBoolean(IGNORE_TIMESTAMPS, false);
223
224      sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
225      LOG.info("Read source hash manifest: " + sourceTableHash);
226      LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
227
228      TableSplit split = (TableSplit) context.getInputSplit();
229      ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
230
231      sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
232      findNextKeyHashPair();
233
234      // create a hasher, but don't start it right away
235      // instead, find the first hash batch at or after the start row
236      // and skip any rows that come before.  they will be caught by the previous task
237      targetHasher = new HashTable.ResultHasher();
238      targetHasher.ignoreTimestamps = ignoreTimestamp;
239    }
240
241    private static Connection openConnection(Configuration conf, String zkClusterConfKey,
242                                             String configPrefix)
243      throws IOException {
244        String zkCluster = conf.get(zkClusterConfKey);
245        Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
246            zkCluster, configPrefix);
247        return ConnectionFactory.createConnection(clusterConf);
248    }
249
250    private static Table openTable(Connection connection, Configuration conf,
251        String tableNameConfKey) throws IOException {
252      return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
253    }
254
255    /**
256     * Attempt to read the next source key/hash pair.
257     * If there are no more, set nextSourceKey to null
258     */
259    private void findNextKeyHashPair() throws IOException {
260      boolean hasNext = sourceHashReader.next();
261      if (hasNext) {
262        nextSourceKey = sourceHashReader.getCurrentKey();
263      } else {
264        // no more keys - last hash goes to the end
265        nextSourceKey = null;
266      }
267    }
268
269    @Override
270    protected void map(ImmutableBytesWritable key, Result value, Context context)
271        throws IOException, InterruptedException {
272      try {
273        // first, finish any hash batches that end before the scanned row
274        while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
275          moveToNextBatch(context);
276        }
277
278        // next, add the scanned row (as long as we've reached the first batch)
279        if (targetHasher.isBatchStarted()) {
280          targetHasher.hashResult(value);
281        }
282      } catch (Throwable t) {
283        mapperException = t;
284        Throwables.propagateIfInstanceOf(t, IOException.class);
285        Throwables.propagateIfInstanceOf(t, InterruptedException.class);
286        Throwables.propagate(t);
287      }
288    }
289
290    /**
291     * If there is an open hash batch, complete it and sync if there are diffs.
292     * Start a new batch, and seek to read the
293     */
294    private void moveToNextBatch(Context context) throws IOException, InterruptedException {
295      if (targetHasher.isBatchStarted()) {
296        finishBatchAndCompareHashes(context);
297      }
298      targetHasher.startBatch(nextSourceKey);
299      currentSourceHash = sourceHashReader.getCurrentHash();
300
301      findNextKeyHashPair();
302    }
303
304    /**
305     * Finish the currently open hash batch.
306     * Compare the target hash to the given source hash.
307     * If they do not match, then sync the covered key range.
308     */
309    private void finishBatchAndCompareHashes(Context context)
310        throws IOException, InterruptedException {
311      targetHasher.finishBatch();
312      context.getCounter(Counter.BATCHES).increment(1);
313      if (targetHasher.getBatchSize() == 0) {
314        context.getCounter(Counter.EMPTY_BATCHES).increment(1);
315      }
316      ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
317      if (targetHash.equals(currentSourceHash)) {
318        context.getCounter(Counter.HASHES_MATCHED).increment(1);
319      } else {
320        context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
321
322        ImmutableBytesWritable stopRow = nextSourceKey == null
323                                          ? new ImmutableBytesWritable(sourceTableHash.stopRow)
324                                          : nextSourceKey;
325
326        if (LOG.isDebugEnabled()) {
327          LOG.debug("Hash mismatch.  Key range: " + toHex(targetHasher.getBatchStartKey())
328              + " to " + toHex(stopRow)
329              + " sourceHash: " + toHex(currentSourceHash)
330              + " targetHash: " + toHex(targetHash));
331        }
332
333        syncRange(context, targetHasher.getBatchStartKey(), stopRow);
334      }
335    }
336    private static String toHex(ImmutableBytesWritable bytes) {
337      return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
338    }
339
340    private static final CellScanner EMPTY_CELL_SCANNER
341      = new CellScanner(Collections.<Result>emptyIterator());
342
343    /**
344     * Rescan the given range directly from the source and target tables.
345     * Count and log differences, and if this is not a dry run, output Puts and Deletes
346     * to make the target table match the source table for this range
347     */
348    private void syncRange(Context context, ImmutableBytesWritable startRow,
349        ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
350      Scan scan = sourceTableHash.initScan();
351      scan.withStartRow(startRow.copyBytes());
352      scan.withStopRow(stopRow.copyBytes());
353
354      ResultScanner sourceScanner = sourceTable.getScanner(scan);
355      CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
356
357      ResultScanner targetScanner = targetTable.getScanner(new Scan(scan));
358      CellScanner targetCells = new CellScanner(targetScanner.iterator());
359
360      boolean rangeMatched = true;
361      byte[] nextSourceRow = sourceCells.nextRow();
362      byte[] nextTargetRow = targetCells.nextRow();
363      while(nextSourceRow != null || nextTargetRow != null) {
364        boolean rowMatched;
365        int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
366        if (rowComparison < 0) {
367          if (LOG.isDebugEnabled()) {
368            LOG.debug("Target missing row: " + Bytes.toString(nextSourceRow));
369          }
370          context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
371
372          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
373          nextSourceRow = sourceCells.nextRow();  // advance only source to next row
374        } else if (rowComparison > 0) {
375          if (LOG.isDebugEnabled()) {
376            LOG.debug("Source missing row: " + Bytes.toString(nextTargetRow));
377          }
378          context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
379
380          rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
381          nextTargetRow = targetCells.nextRow();  // advance only target to next row
382        } else {
383          // current row is the same on both sides, compare cell by cell
384          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
385          nextSourceRow = sourceCells.nextRow();
386          nextTargetRow = targetCells.nextRow();
387        }
388
389        if (!rowMatched) {
390          rangeMatched = false;
391        }
392      }
393
394      sourceScanner.close();
395      targetScanner.close();
396
397      context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
398        .increment(1);
399    }
400
401    private static class CellScanner {
402      private final Iterator<Result> results;
403
404      private byte[] currentRow;
405      private Result currentRowResult;
406      private int nextCellInRow;
407
408      private Result nextRowResult;
409
410      public CellScanner(Iterator<Result> results) {
411        this.results = results;
412      }
413
414      /**
415       * Advance to the next row and return its row key.
416       * Returns null iff there are no more rows.
417       */
418      public byte[] nextRow() {
419        if (nextRowResult == null) {
420          // no cached row - check scanner for more
421          while (results.hasNext()) {
422            nextRowResult = results.next();
423            Cell nextCell = nextRowResult.rawCells()[0];
424            if (currentRow == null
425                || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(),
426                nextCell.getRowOffset(), nextCell.getRowLength())) {
427              // found next row
428              break;
429            } else {
430              // found another result from current row, keep scanning
431              nextRowResult = null;
432            }
433          }
434
435          if (nextRowResult == null) {
436            // end of data, no more rows
437            currentRowResult = null;
438            currentRow = null;
439            return null;
440          }
441        }
442
443        // advance to cached result for next row
444        currentRowResult = nextRowResult;
445        nextCellInRow = 0;
446        currentRow = currentRowResult.getRow();
447        nextRowResult = null;
448        return currentRow;
449      }
450
451      /**
452       * Returns the next Cell in the current row or null iff none remain.
453       */
454      public Cell nextCellInRow() {
455        if (currentRowResult == null) {
456          // nothing left in current row
457          return null;
458        }
459
460        Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
461        nextCellInRow++;
462        if (nextCellInRow == currentRowResult.size()) {
463          if (results.hasNext()) {
464            Result result = results.next();
465            Cell cell = result.rawCells()[0];
466            if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
467                cell.getRowOffset(), cell.getRowLength())) {
468              // result is part of current row
469              currentRowResult = result;
470              nextCellInRow = 0;
471            } else {
472              // result is part of next row, cache it
473              nextRowResult = result;
474              // current row is complete
475              currentRowResult = null;
476            }
477          } else {
478            // end of data
479            currentRowResult = null;
480          }
481        }
482        return nextCell;
483      }
484    }
485
486    private Cell checkAndResetTimestamp(Cell sourceCell){
487      if (ignoreTimestamp) {
488        sourceCell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
489          .setType(sourceCell.getType())
490          .setRow(sourceCell.getRowArray(),
491            sourceCell.getRowOffset(), sourceCell.getRowLength())
492          .setFamily(sourceCell.getFamilyArray(),
493            sourceCell.getFamilyOffset(), sourceCell.getFamilyLength())
494          .setQualifier(sourceCell.getQualifierArray(),
495            sourceCell.getQualifierOffset(), sourceCell.getQualifierLength())
496          .setTimestamp(System.currentTimeMillis())
497          .setValue(sourceCell.getValueArray(),
498            sourceCell.getValueOffset(), sourceCell.getValueLength()).build();
499      }
500      return sourceCell;
501    }
502
503    /**
504     * Compare the cells for the given row from the source and target tables.
505     * Count and log any differences.
506     * If not a dry run, output a Put and/or Delete needed to sync the target table
507     * to match the source table.
508     */
509    private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
510        CellScanner targetCells) throws IOException, InterruptedException {
511      Put put = null;
512      Delete delete = null;
513      long matchingCells = 0;
514      boolean matchingRow = true;
515      Cell sourceCell = sourceCells.nextCellInRow();
516      Cell targetCell = targetCells.nextCellInRow();
517      while (sourceCell != null || targetCell != null) {
518
519        int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
520        if (cellKeyComparison < 0) {
521          if (LOG.isDebugEnabled()) {
522            LOG.debug("Target missing cell: " + sourceCell);
523          }
524          context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
525          matchingRow = false;
526
527          if (!dryRun && doPuts) {
528            if (put == null) {
529              put = new Put(rowKey);
530            }
531            sourceCell = checkAndResetTimestamp(sourceCell);
532            put.add(sourceCell);
533          }
534
535          sourceCell = sourceCells.nextCellInRow();
536        } else if (cellKeyComparison > 0) {
537          if (LOG.isDebugEnabled()) {
538            LOG.debug("Source missing cell: " + targetCell);
539          }
540          context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
541          matchingRow = false;
542
543          if (!dryRun && doDeletes) {
544            if (delete == null) {
545              delete = new Delete(rowKey);
546            }
547            // add a tombstone to exactly match the target cell that is missing on the source
548            delete.addColumn(CellUtil.cloneFamily(targetCell),
549                CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp());
550          }
551
552          targetCell = targetCells.nextCellInRow();
553        } else {
554          // the cell keys are equal, now check values
555          if (CellUtil.matchingValue(sourceCell, targetCell)) {
556            matchingCells++;
557          } else {
558            if (LOG.isDebugEnabled()) {
559              LOG.debug("Different values: ");
560              LOG.debug("  source cell: " + sourceCell
561                  + " value: " + Bytes.toString(sourceCell.getValueArray(),
562                      sourceCell.getValueOffset(), sourceCell.getValueLength()));
563              LOG.debug("  target cell: " + targetCell
564                  + " value: " + Bytes.toString(targetCell.getValueArray(),
565                      targetCell.getValueOffset(), targetCell.getValueLength()));
566            }
567            context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
568            matchingRow = false;
569
570            if (!dryRun && doPuts) {
571              // overwrite target cell
572              if (put == null) {
573                put = new Put(rowKey);
574              }
575              sourceCell = checkAndResetTimestamp(sourceCell);
576              put.add(sourceCell);
577            }
578          }
579          sourceCell = sourceCells.nextCellInRow();
580          targetCell = targetCells.nextCellInRow();
581        }
582
583        if (!dryRun && sourceTableHash.scanBatch > 0) {
584          if (put != null && put.size() >= sourceTableHash.scanBatch) {
585            context.write(new ImmutableBytesWritable(rowKey), put);
586            put = null;
587          }
588          if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
589            context.write(new ImmutableBytesWritable(rowKey), delete);
590            delete = null;
591          }
592        }
593      }
594
595      if (!dryRun) {
596        if (put != null) {
597          context.write(new ImmutableBytesWritable(rowKey), put);
598        }
599        if (delete != null) {
600          context.write(new ImmutableBytesWritable(rowKey), delete);
601        }
602      }
603
604      if (matchingCells > 0) {
605        context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
606      }
607      if (matchingRow) {
608        context.getCounter(Counter.MATCHINGROWS).increment(1);
609        return true;
610      } else {
611        context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
612        return false;
613      }
614    }
615
616    /**
617     * Compare row keys of the given Result objects.
618     * Nulls are after non-nulls
619     */
620    private static int compareRowKeys(byte[] r1, byte[] r2) {
621      if (r1 == null) {
622        return 1;  // source missing row
623      } else if (r2 == null) {
624        return -1; // target missing row
625      } else {
626        // Sync on no META tables only. We can directly do what CellComparator is doing inside.
627        // Never the call going to MetaCellComparator.
628        return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length);
629      }
630    }
631
632    /**
633     * Compare families, qualifiers, and timestamps of the given Cells.
634     * They are assumed to be of the same row.
635     * Nulls are after non-nulls.
636     */
637     private int compareCellKeysWithinRow(Cell c1, Cell c2) {
638      if (c1 == null) {
639        return 1; // source missing cell
640      }
641      if (c2 == null) {
642        return -1; // target missing cell
643      }
644
645      int result = CellComparator.getInstance().compareFamilies(c1, c2);
646      if (result != 0) {
647        return result;
648      }
649
650      result = CellComparator.getInstance().compareQualifiers(c1, c2);
651      if (result != 0) {
652        return result;
653      }
654
655      if (this.ignoreTimestamp) {
656        return 0;
657      } else {
658        // note timestamp comparison is inverted - more recent cells first
659        return CellComparator.getInstance().compareTimestamps(c1, c2);
660      }
661    }
662
663    @Override
664    protected void cleanup(Context context)
665        throws IOException, InterruptedException {
666      if (mapperException == null) {
667        try {
668          finishRemainingHashRanges(context);
669        } catch (Throwable t) {
670          mapperException = t;
671        }
672      }
673
674      try {
675        sourceTable.close();
676        targetTable.close();
677        sourceConnection.close();
678        targetConnection.close();
679      } catch (Throwable t) {
680        if (mapperException == null) {
681          mapperException = t;
682        } else {
683          LOG.error("Suppressing exception from closing tables", t);
684        }
685      }
686
687      // propagate first exception
688      if (mapperException != null) {
689        Throwables.propagateIfInstanceOf(mapperException, IOException.class);
690        Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
691        Throwables.propagate(mapperException);
692      }
693    }
694
695    private void finishRemainingHashRanges(Context context) throws IOException,
696        InterruptedException {
697      TableSplit split = (TableSplit) context.getInputSplit();
698      byte[] splitEndRow = split.getEndRow();
699      boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
700
701      // if there are more hash batches that begin before the end of this split move to them
702      while (nextSourceKey != null
703          && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
704        moveToNextBatch(context);
705      }
706
707      if (targetHasher.isBatchStarted()) {
708        // need to complete the final open hash batch
709
710        if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
711              || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) {
712          // the open hash range continues past the end of this region
713          // add a scan to complete the current hash range
714          Scan scan = sourceTableHash.initScan();
715          scan.withStartRow(splitEndRow);
716          if (nextSourceKey == null) {
717            scan.withStopRow(sourceTableHash.stopRow);
718          } else {
719            scan.withStopRow(nextSourceKey.copyBytes());
720          }
721
722          ResultScanner targetScanner = null;
723          try {
724            targetScanner = targetTable.getScanner(scan);
725            for (Result row : targetScanner) {
726              targetHasher.hashResult(row);
727            }
728          } finally {
729            if (targetScanner != null) {
730              targetScanner.close();
731            }
732          }
733        } // else current batch ends exactly at split end row
734
735        finishBatchAndCompareHashes(context);
736      }
737    }
738  }
739
740  private static final int NUM_ARGS = 3;
741  private static void printUsage(final String errorMsg) {
742    if (errorMsg != null && errorMsg.length() > 0) {
743      System.err.println("ERROR: " + errorMsg);
744      System.err.println();
745    }
746    System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
747    System.err.println();
748    System.err.println("Options:");
749
750    System.err.println(" sourcezkcluster  ZK cluster key of the source table");
751    System.err.println("                  (defaults to cluster in classpath's config)");
752    System.err.println(" targetzkcluster  ZK cluster key of the target table");
753    System.err.println("                  (defaults to cluster in classpath's config)");
754    System.err.println(" dryrun           if true, output counters but no writes");
755    System.err.println("                  (defaults to false)");
756    System.err.println(" doDeletes        if false, does not perform deletes");
757    System.err.println("                  (defaults to true)");
758    System.err.println(" doPuts           if false, does not perform puts");
759    System.err.println("                  (defaults to true)");
760    System.err.println(" ignoreTimestamps if true, ignores cells timestamps while comparing ");
761    System.err.println("                  cell values. Any missing cell on target then gets");
762    System.err.println("                  added with current time as timestamp ");
763    System.err.println("                  (defaults to false)");
764    System.err.println();
765    System.err.println("Args:");
766    System.err.println(" sourcehashdir    path to HashTable output dir for source table");
767    System.err.println("                  (see org.apache.hadoop.hbase.mapreduce.HashTable)");
768    System.err.println(" sourcetable      Name of the source table to sync from");
769    System.err.println(" targettable      Name of the target table to sync to");
770    System.err.println();
771    System.err.println("Examples:");
772    System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
773    System.err.println(" to a local target cluster:");
774    System.err.println(" $ hbase " +
775        "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
776        + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
777        + " hdfs://nn:9000/hashes/tableA tableA tableA");
778  }
779
780  private boolean doCommandLine(final String[] args) {
781    if (args.length < NUM_ARGS) {
782      printUsage(null);
783      return false;
784    }
785    try {
786      sourceHashDir = new Path(args[args.length - 3]);
787      sourceTableName = args[args.length - 2];
788      targetTableName = args[args.length - 1];
789
790      for (int i = 0; i < args.length - NUM_ARGS; i++) {
791        String cmd = args[i];
792        if (cmd.equals("-h") || cmd.startsWith("--h")) {
793          printUsage(null);
794          return false;
795        }
796
797        final String sourceZkClusterKey = "--sourcezkcluster=";
798        if (cmd.startsWith(sourceZkClusterKey)) {
799          sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
800          continue;
801        }
802
803        final String targetZkClusterKey = "--targetzkcluster=";
804        if (cmd.startsWith(targetZkClusterKey)) {
805          targetZkCluster = cmd.substring(targetZkClusterKey.length());
806          continue;
807        }
808
809        final String dryRunKey = "--dryrun=";
810        if (cmd.startsWith(dryRunKey)) {
811          dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
812          continue;
813        }
814
815        final String doDeletesKey = "--doDeletes=";
816        if (cmd.startsWith(doDeletesKey)) {
817          doDeletes = Boolean.parseBoolean(cmd.substring(doDeletesKey.length()));
818          continue;
819        }
820
821        final String doPutsKey = "--doPuts=";
822        if (cmd.startsWith(doPutsKey)) {
823          doPuts = Boolean.parseBoolean(cmd.substring(doPutsKey.length()));
824          continue;
825        }
826
827        final String ignoreTimestampsKey = "--ignoreTimestamps=";
828        if (cmd.startsWith(ignoreTimestampsKey)) {
829          ignoreTimestamps = Boolean.parseBoolean(cmd.substring(ignoreTimestampsKey.length()));
830          continue;
831        }
832
833        printUsage("Invalid argument '" + cmd + "'");
834        return false;
835      }
836
837
838    } catch (Exception e) {
839      LOG.error("Failed to parse commandLine arguments", e);
840      printUsage("Can't start because " + e.getMessage());
841      return false;
842    }
843    return true;
844  }
845
846  /**
847   * Main entry point.
848   */
849  public static void main(String[] args) throws Exception {
850    int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
851    System.exit(ret);
852  }
853
854  @Override
855  public int run(String[] args) throws Exception {
856    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
857    if (!doCommandLine(otherArgs)) {
858      return 1;
859    }
860
861    Job job = createSubmittableJob(otherArgs);
862    if (!job.waitForCompletion(true)) {
863      LOG.info("Map-reduce job failed!");
864      return 1;
865    }
866    counters = job.getCounters();
867    return 0;
868  }
869
870}