001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.Iterator;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.conf.Configured;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparator;
030import org.apache.hadoop.hbase.CellUtil;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.Delete;
036import org.apache.hadoop.hbase.client.Mutation;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.ResultScanner;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.apache.hadoop.mapreduce.Counters;
045import org.apache.hadoop.mapreduce.Job;
046import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
047import org.apache.hadoop.mapreduce.security.TokenCache;
048import org.apache.hadoop.util.GenericOptionsParser;
049import org.apache.hadoop.util.Tool;
050import org.apache.hadoop.util.ToolRunner;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
056
057@InterfaceAudience.Private
058public class SyncTable extends Configured implements Tool {
059
060  private static final Logger LOG = LoggerFactory.getLogger(SyncTable.class);
061
062  static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
063  static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
064  static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
065  static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
066  static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
067  static final String DRY_RUN_CONF_KEY = "sync.table.dry.run";
068  static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes";
069  static final String DO_PUTS_CONF_KEY = "sync.table.do.puts";
070
071  Path sourceHashDir;
072  String sourceTableName;
073  String targetTableName;
074
075  String sourceZkCluster;
076  String targetZkCluster;
077  boolean dryRun;
078  boolean doDeletes = true;
079  boolean doPuts = true;
080
081  Counters counters;
082
083  public SyncTable(Configuration conf) {
084    super(conf);
085  }
086
087  private void initCredentialsForHBase(String zookeeper, Job job) throws IOException {
088    Configuration peerConf = HBaseConfiguration.createClusterConf(job
089            .getConfiguration(), zookeeper);
090    if(peerConf.get("hbase.security.authentication").equals("kerberos")){
091      TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
092    }
093  }
094
095  public Job createSubmittableJob(String[] args) throws IOException {
096    FileSystem fs = sourceHashDir.getFileSystem(getConf());
097    if (!fs.exists(sourceHashDir)) {
098      throw new IOException("Source hash dir not found: " + sourceHashDir);
099    }
100
101    Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
102        "syncTable_" + sourceTableName + "-" + targetTableName));
103    Configuration jobConf = job.getConfiguration();
104    if (jobConf.get("hadoop.security.authentication").equals("kerberos")) {
105      TokenCache.obtainTokensForNamenodes(job.getCredentials(), new
106          Path[] { sourceHashDir }, getConf());
107    }
108
109    HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
110    LOG.info("Read source hash manifest: " + tableHash);
111    LOG.info("Read " + tableHash.partitions.size() + " partition keys");
112    if (!tableHash.tableName.equals(sourceTableName)) {
113      LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
114          + tableHash.tableName + " but job is reading from: " + sourceTableName);
115    }
116    if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
117      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
118          + " should be 1 more than the number of partition keys.  However, the manifest file "
119          + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
120          + " found in the partitions file is " + tableHash.partitions.size());
121    }
122
123    Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
124    int dataSubdirCount = 0;
125    for (FileStatus file : fs.listStatus(dataDir)) {
126      if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
127        dataSubdirCount++;
128      }
129    }
130
131    if (dataSubdirCount != tableHash.numHashFiles) {
132      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
133          + " should be 1 more than the number of partition keys.  However, the number of data dirs"
134          + " found is " + dataSubdirCount + " but the number of partition keys"
135          + " found in the partitions file is " + tableHash.partitions.size());
136    }
137
138    job.setJarByClass(HashTable.class);
139    jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
140    jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
141    jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
142    if (sourceZkCluster != null) {
143      jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
144      initCredentialsForHBase(sourceZkCluster, job);
145    }
146    if (targetZkCluster != null) {
147      jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
148      initCredentialsForHBase(targetZkCluster, job);
149    }
150    jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
151    jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes);
152    jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts);
153
154    TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
155        SyncMapper.class, null, null, job);
156
157    job.setNumReduceTasks(0);
158
159    if (dryRun) {
160      job.setOutputFormatClass(NullOutputFormat.class);
161    } else {
162      // No reducers.  Just write straight to table.  Call initTableReducerJob
163      // because it sets up the TableOutputFormat.
164      TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
165          targetZkCluster, null, null);
166
167      // would be nice to add an option for bulk load instead
168    }
169
170    // Obtain an authentication token, for the specified cluster, on behalf of the current user
171    if (sourceZkCluster != null) {
172      Configuration peerConf =
173          HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster);
174      TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
175    }
176    return job;
177  }
178
179  public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
180    Path sourceHashDir;
181
182    Connection sourceConnection;
183    Connection targetConnection;
184    Table sourceTable;
185    Table targetTable;
186    boolean dryRun;
187    boolean doDeletes = true;
188    boolean doPuts = true;
189
190    HashTable.TableHash sourceTableHash;
191    HashTable.TableHash.Reader sourceHashReader;
192    ImmutableBytesWritable currentSourceHash;
193    ImmutableBytesWritable nextSourceKey;
194    HashTable.ResultHasher targetHasher;
195
196    Throwable mapperException;
197
198    public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS,
199      SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES,
200      MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED};
201
202    @Override
203    protected void setup(Context context) throws IOException {
204
205      Configuration conf = context.getConfiguration();
206      sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
207      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
208      targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
209          TableOutputFormat.OUTPUT_CONF_PREFIX);
210      sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
211      targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
212      dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
213      doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true);
214      doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true);
215
216      sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
217      LOG.info("Read source hash manifest: " + sourceTableHash);
218      LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
219
220      TableSplit split = (TableSplit) context.getInputSplit();
221      ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
222
223      sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
224      findNextKeyHashPair();
225
226      // create a hasher, but don't start it right away
227      // instead, find the first hash batch at or after the start row
228      // and skip any rows that come before.  they will be caught by the previous task
229      targetHasher = new HashTable.ResultHasher();
230    }
231
232    private static Connection openConnection(Configuration conf, String zkClusterConfKey,
233                                             String configPrefix)
234      throws IOException {
235        String zkCluster = conf.get(zkClusterConfKey);
236        Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
237            zkCluster, configPrefix);
238        return ConnectionFactory.createConnection(clusterConf);
239    }
240
241    private static Table openTable(Connection connection, Configuration conf,
242        String tableNameConfKey) throws IOException {
243      return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
244    }
245
246    /**
247     * Attempt to read the next source key/hash pair.
248     * If there are no more, set nextSourceKey to null
249     */
250    private void findNextKeyHashPair() throws IOException {
251      boolean hasNext = sourceHashReader.next();
252      if (hasNext) {
253        nextSourceKey = sourceHashReader.getCurrentKey();
254      } else {
255        // no more keys - last hash goes to the end
256        nextSourceKey = null;
257      }
258    }
259
260    @Override
261    protected void map(ImmutableBytesWritable key, Result value, Context context)
262        throws IOException, InterruptedException {
263      try {
264        // first, finish any hash batches that end before the scanned row
265        while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
266          moveToNextBatch(context);
267        }
268
269        // next, add the scanned row (as long as we've reached the first batch)
270        if (targetHasher.isBatchStarted()) {
271          targetHasher.hashResult(value);
272        }
273      } catch (Throwable t) {
274        mapperException = t;
275        Throwables.propagateIfInstanceOf(t, IOException.class);
276        Throwables.propagateIfInstanceOf(t, InterruptedException.class);
277        Throwables.propagate(t);
278      }
279    }
280
281    /**
282     * If there is an open hash batch, complete it and sync if there are diffs.
283     * Start a new batch, and seek to read the
284     */
285    private void moveToNextBatch(Context context) throws IOException, InterruptedException {
286      if (targetHasher.isBatchStarted()) {
287        finishBatchAndCompareHashes(context);
288      }
289      targetHasher.startBatch(nextSourceKey);
290      currentSourceHash = sourceHashReader.getCurrentHash();
291
292      findNextKeyHashPair();
293    }
294
295    /**
296     * Finish the currently open hash batch.
297     * Compare the target hash to the given source hash.
298     * If they do not match, then sync the covered key range.
299     */
300    private void finishBatchAndCompareHashes(Context context)
301        throws IOException, InterruptedException {
302      targetHasher.finishBatch();
303      context.getCounter(Counter.BATCHES).increment(1);
304      if (targetHasher.getBatchSize() == 0) {
305        context.getCounter(Counter.EMPTY_BATCHES).increment(1);
306      }
307      ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
308      if (targetHash.equals(currentSourceHash)) {
309        context.getCounter(Counter.HASHES_MATCHED).increment(1);
310      } else {
311        context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
312
313        ImmutableBytesWritable stopRow = nextSourceKey == null
314                                          ? new ImmutableBytesWritable(sourceTableHash.stopRow)
315                                          : nextSourceKey;
316
317        if (LOG.isDebugEnabled()) {
318          LOG.debug("Hash mismatch.  Key range: " + toHex(targetHasher.getBatchStartKey())
319              + " to " + toHex(stopRow)
320              + " sourceHash: " + toHex(currentSourceHash)
321              + " targetHash: " + toHex(targetHash));
322        }
323
324        syncRange(context, targetHasher.getBatchStartKey(), stopRow);
325      }
326    }
327    private static String toHex(ImmutableBytesWritable bytes) {
328      return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
329    }
330
331    private static final CellScanner EMPTY_CELL_SCANNER
332      = new CellScanner(Collections.<Result>emptyIterator());
333
334    /**
335     * Rescan the given range directly from the source and target tables.
336     * Count and log differences, and if this is not a dry run, output Puts and Deletes
337     * to make the target table match the source table for this range
338     */
339    private void syncRange(Context context, ImmutableBytesWritable startRow,
340        ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
341      Scan scan = sourceTableHash.initScan();
342      scan.setStartRow(startRow.copyBytes());
343      scan.setStopRow(stopRow.copyBytes());
344
345      ResultScanner sourceScanner = sourceTable.getScanner(scan);
346      CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
347
348      ResultScanner targetScanner = targetTable.getScanner(new Scan(scan));
349      CellScanner targetCells = new CellScanner(targetScanner.iterator());
350
351      boolean rangeMatched = true;
352      byte[] nextSourceRow = sourceCells.nextRow();
353      byte[] nextTargetRow = targetCells.nextRow();
354      while(nextSourceRow != null || nextTargetRow != null) {
355        boolean rowMatched;
356        int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
357        if (rowComparison < 0) {
358          if (LOG.isInfoEnabled()) {
359            LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow));
360          }
361          context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
362
363          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
364          nextSourceRow = sourceCells.nextRow();  // advance only source to next row
365        } else if (rowComparison > 0) {
366          if (LOG.isInfoEnabled()) {
367            LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow));
368          }
369          context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
370
371          rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
372          nextTargetRow = targetCells.nextRow();  // advance only target to next row
373        } else {
374          // current row is the same on both sides, compare cell by cell
375          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
376          nextSourceRow = sourceCells.nextRow();
377          nextTargetRow = targetCells.nextRow();
378        }
379
380        if (!rowMatched) {
381          rangeMatched = false;
382        }
383      }
384
385      sourceScanner.close();
386      targetScanner.close();
387
388      context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
389        .increment(1);
390    }
391
392    private static class CellScanner {
393      private final Iterator<Result> results;
394
395      private byte[] currentRow;
396      private Result currentRowResult;
397      private int nextCellInRow;
398
399      private Result nextRowResult;
400
401      public CellScanner(Iterator<Result> results) {
402        this.results = results;
403      }
404
405      /**
406       * Advance to the next row and return its row key.
407       * Returns null iff there are no more rows.
408       */
409      public byte[] nextRow() {
410        if (nextRowResult == null) {
411          // no cached row - check scanner for more
412          while (results.hasNext()) {
413            nextRowResult = results.next();
414            Cell nextCell = nextRowResult.rawCells()[0];
415            if (currentRow == null
416                || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(),
417                nextCell.getRowOffset(), nextCell.getRowLength())) {
418              // found next row
419              break;
420            } else {
421              // found another result from current row, keep scanning
422              nextRowResult = null;
423            }
424          }
425
426          if (nextRowResult == null) {
427            // end of data, no more rows
428            currentRowResult = null;
429            currentRow = null;
430            return null;
431          }
432        }
433
434        // advance to cached result for next row
435        currentRowResult = nextRowResult;
436        nextCellInRow = 0;
437        currentRow = currentRowResult.getRow();
438        nextRowResult = null;
439        return currentRow;
440      }
441
442      /**
443       * Returns the next Cell in the current row or null iff none remain.
444       */
445      public Cell nextCellInRow() {
446        if (currentRowResult == null) {
447          // nothing left in current row
448          return null;
449        }
450
451        Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
452        nextCellInRow++;
453        if (nextCellInRow == currentRowResult.size()) {
454          if (results.hasNext()) {
455            Result result = results.next();
456            Cell cell = result.rawCells()[0];
457            if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
458                cell.getRowOffset(), cell.getRowLength())) {
459              // result is part of current row
460              currentRowResult = result;
461              nextCellInRow = 0;
462            } else {
463              // result is part of next row, cache it
464              nextRowResult = result;
465              // current row is complete
466              currentRowResult = null;
467            }
468          } else {
469            // end of data
470            currentRowResult = null;
471          }
472        }
473        return nextCell;
474      }
475    }
476
477    /**
478     * Compare the cells for the given row from the source and target tables.
479     * Count and log any differences.
480     * If not a dry run, output a Put and/or Delete needed to sync the target table
481     * to match the source table.
482     */
483    private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
484        CellScanner targetCells) throws IOException, InterruptedException {
485      Put put = null;
486      Delete delete = null;
487      long matchingCells = 0;
488      boolean matchingRow = true;
489      Cell sourceCell = sourceCells.nextCellInRow();
490      Cell targetCell = targetCells.nextCellInRow();
491      while (sourceCell != null || targetCell != null) {
492
493        int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
494        if (cellKeyComparison < 0) {
495          if (LOG.isDebugEnabled()) {
496            LOG.debug("Target missing cell: " + sourceCell);
497          }
498          context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
499          matchingRow = false;
500
501          if (!dryRun && doPuts) {
502            if (put == null) {
503              put = new Put(rowKey);
504            }
505            put.add(sourceCell);
506          }
507
508          sourceCell = sourceCells.nextCellInRow();
509        } else if (cellKeyComparison > 0) {
510          if (LOG.isDebugEnabled()) {
511            LOG.debug("Source missing cell: " + targetCell);
512          }
513          context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
514          matchingRow = false;
515
516          if (!dryRun && doDeletes) {
517            if (delete == null) {
518              delete = new Delete(rowKey);
519            }
520            // add a tombstone to exactly match the target cell that is missing on the source
521            delete.addColumn(CellUtil.cloneFamily(targetCell),
522                CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp());
523          }
524
525          targetCell = targetCells.nextCellInRow();
526        } else {
527          // the cell keys are equal, now check values
528          if (CellUtil.matchingValue(sourceCell, targetCell)) {
529            matchingCells++;
530          } else {
531            if (LOG.isDebugEnabled()) {
532              LOG.debug("Different values: ");
533              LOG.debug("  source cell: " + sourceCell
534                  + " value: " + Bytes.toHex(sourceCell.getValueArray(),
535                      sourceCell.getValueOffset(), sourceCell.getValueLength()));
536              LOG.debug("  target cell: " + targetCell
537                  + " value: " + Bytes.toHex(targetCell.getValueArray(),
538                      targetCell.getValueOffset(), targetCell.getValueLength()));
539            }
540            context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
541            matchingRow = false;
542
543            if (!dryRun && doPuts) {
544              // overwrite target cell
545              if (put == null) {
546                put = new Put(rowKey);
547              }
548              put.add(sourceCell);
549            }
550          }
551          sourceCell = sourceCells.nextCellInRow();
552          targetCell = targetCells.nextCellInRow();
553        }
554
555        if (!dryRun && sourceTableHash.scanBatch > 0) {
556          if (put != null && put.size() >= sourceTableHash.scanBatch) {
557            context.write(new ImmutableBytesWritable(rowKey), put);
558            put = null;
559          }
560          if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
561            context.write(new ImmutableBytesWritable(rowKey), delete);
562            delete = null;
563          }
564        }
565      }
566
567      if (!dryRun) {
568        if (put != null) {
569          context.write(new ImmutableBytesWritable(rowKey), put);
570        }
571        if (delete != null) {
572          context.write(new ImmutableBytesWritable(rowKey), delete);
573        }
574      }
575
576      if (matchingCells > 0) {
577        context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
578      }
579      if (matchingRow) {
580        context.getCounter(Counter.MATCHINGROWS).increment(1);
581        return true;
582      } else {
583        context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
584        return false;
585      }
586    }
587
588    /**
589     * Compare row keys of the given Result objects.
590     * Nulls are after non-nulls
591     */
592    private static int compareRowKeys(byte[] r1, byte[] r2) {
593      if (r1 == null) {
594        return 1;  // source missing row
595      } else if (r2 == null) {
596        return -1; // target missing row
597      } else {
598        // Sync on no META tables only. We can directly do what CellComparator is doing inside.
599        // Never the call going to MetaCellComparator.
600        return Bytes.compareTo(r1, 0, r1.length, r2, 0, r2.length);
601      }
602    }
603
604    /**
605     * Compare families, qualifiers, and timestamps of the given Cells.
606     * They are assumed to be of the same row.
607     * Nulls are after non-nulls.
608     */
609     private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
610      if (c1 == null) {
611        return 1; // source missing cell
612      }
613      if (c2 == null) {
614        return -1; // target missing cell
615      }
616
617      int result = CellComparator.getInstance().compareFamilies(c1, c2);
618      if (result != 0) {
619        return result;
620      }
621
622      result = CellComparator.getInstance().compareQualifiers(c1, c2);
623      if (result != 0) {
624        return result;
625      }
626
627      // note timestamp comparison is inverted - more recent cells first
628      return CellComparator.getInstance().compareTimestamps(c1, c2);
629    }
630
631    @Override
632    protected void cleanup(Context context)
633        throws IOException, InterruptedException {
634      if (mapperException == null) {
635        try {
636          finishRemainingHashRanges(context);
637        } catch (Throwable t) {
638          mapperException = t;
639        }
640      }
641
642      try {
643        sourceTable.close();
644        targetTable.close();
645        sourceConnection.close();
646        targetConnection.close();
647      } catch (Throwable t) {
648        if (mapperException == null) {
649          mapperException = t;
650        } else {
651          LOG.error("Suppressing exception from closing tables", t);
652        }
653      }
654
655      // propagate first exception
656      if (mapperException != null) {
657        Throwables.propagateIfInstanceOf(mapperException, IOException.class);
658        Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
659        Throwables.propagate(mapperException);
660      }
661    }
662
663    private void finishRemainingHashRanges(Context context) throws IOException,
664        InterruptedException {
665      TableSplit split = (TableSplit) context.getInputSplit();
666      byte[] splitEndRow = split.getEndRow();
667      boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
668
669      // if there are more hash batches that begin before the end of this split move to them
670      while (nextSourceKey != null
671          && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
672        moveToNextBatch(context);
673      }
674
675      if (targetHasher.isBatchStarted()) {
676        // need to complete the final open hash batch
677
678        if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
679              || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) {
680          // the open hash range continues past the end of this region
681          // add a scan to complete the current hash range
682          Scan scan = sourceTableHash.initScan();
683          scan.setStartRow(splitEndRow);
684          if (nextSourceKey == null) {
685            scan.setStopRow(sourceTableHash.stopRow);
686          } else {
687            scan.setStopRow(nextSourceKey.copyBytes());
688          }
689
690          ResultScanner targetScanner = null;
691          try {
692            targetScanner = targetTable.getScanner(scan);
693            for (Result row : targetScanner) {
694              targetHasher.hashResult(row);
695            }
696          } finally {
697            if (targetScanner != null) {
698              targetScanner.close();
699            }
700          }
701        } // else current batch ends exactly at split end row
702
703        finishBatchAndCompareHashes(context);
704      }
705    }
706  }
707
708  private static final int NUM_ARGS = 3;
709  private static void printUsage(final String errorMsg) {
710    if (errorMsg != null && errorMsg.length() > 0) {
711      System.err.println("ERROR: " + errorMsg);
712      System.err.println();
713    }
714    System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
715    System.err.println();
716    System.err.println("Options:");
717
718    System.err.println(" sourcezkcluster  ZK cluster key of the source table");
719    System.err.println("                  (defaults to cluster in classpath's config)");
720    System.err.println(" targetzkcluster  ZK cluster key of the target table");
721    System.err.println("                  (defaults to cluster in classpath's config)");
722    System.err.println(" dryrun           if true, output counters but no writes");
723    System.err.println("                  (defaults to false)");
724    System.err.println(" doDeletes        if false, does not perform deletes");
725    System.err.println("                  (defaults to true)");
726    System.err.println(" doPuts           if false, does not perform puts ");
727    System.err.println("                  (defaults to true)");
728    System.err.println();
729    System.err.println("Args:");
730    System.err.println(" sourcehashdir    path to HashTable output dir for source table");
731    System.err.println("                  (see org.apache.hadoop.hbase.mapreduce.HashTable)");
732    System.err.println(" sourcetable      Name of the source table to sync from");
733    System.err.println(" targettable      Name of the target table to sync to");
734    System.err.println();
735    System.err.println("Examples:");
736    System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
737    System.err.println(" to a local target cluster:");
738    System.err.println(" $ hbase " +
739        "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
740        + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
741        + " hdfs://nn:9000/hashes/tableA tableA tableA");
742  }
743
744  private boolean doCommandLine(final String[] args) {
745    if (args.length < NUM_ARGS) {
746      printUsage(null);
747      return false;
748    }
749    try {
750      sourceHashDir = new Path(args[args.length - 3]);
751      sourceTableName = args[args.length - 2];
752      targetTableName = args[args.length - 1];
753
754      for (int i = 0; i < args.length - NUM_ARGS; i++) {
755        String cmd = args[i];
756        if (cmd.equals("-h") || cmd.startsWith("--h")) {
757          printUsage(null);
758          return false;
759        }
760
761        final String sourceZkClusterKey = "--sourcezkcluster=";
762        if (cmd.startsWith(sourceZkClusterKey)) {
763          sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
764          continue;
765        }
766
767        final String targetZkClusterKey = "--targetzkcluster=";
768        if (cmd.startsWith(targetZkClusterKey)) {
769          targetZkCluster = cmd.substring(targetZkClusterKey.length());
770          continue;
771        }
772
773        final String dryRunKey = "--dryrun=";
774        if (cmd.startsWith(dryRunKey)) {
775          dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
776          continue;
777        }
778
779        final String doDeletesKey = "--doDeletes=";
780        if (cmd.startsWith(doDeletesKey)) {
781          doDeletes = Boolean.parseBoolean(cmd.substring(doDeletesKey.length()));
782          continue;
783        }
784
785        final String doPutsKey = "--doPuts=";
786        if (cmd.startsWith(doPutsKey)) {
787          doPuts = Boolean.parseBoolean(cmd.substring(doPutsKey.length()));
788          continue;
789        }
790
791        printUsage("Invalid argument '" + cmd + "'");
792        return false;
793      }
794
795
796    } catch (Exception e) {
797      e.printStackTrace();
798      printUsage("Can't start because " + e.getMessage());
799      return false;
800    }
801    return true;
802  }
803
804  /**
805   * Main entry point.
806   */
807  public static void main(String[] args) throws Exception {
808    int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
809    System.exit(ret);
810  }
811
812  @Override
813  public int run(String[] args) throws Exception {
814    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
815    if (!doCommandLine(otherArgs)) {
816      return 1;
817    }
818
819    Job job = createSubmittableJob(otherArgs);
820    if (!job.waitForCompletion(true)) {
821      LOG.info("Map-reduce job failed!");
822      return 1;
823    }
824    counters = job.getCounters();
825    return 0;
826  }
827
828}