001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce.replication;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.UUID;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.conf.Configured;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Abortable;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.Get;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.ResultScanner;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TableSnapshotScanner;
041import org.apache.hadoop.hbase.filter.Filter;
042import org.apache.hadoop.hbase.filter.FilterList;
043import org.apache.hadoop.hbase.filter.PrefixFilter;
044import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
045import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
046import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
047import org.apache.hadoop.hbase.mapreduce.TableMapper;
048import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
049import org.apache.hadoop.hbase.mapreduce.TableSplit;
050import org.apache.hadoop.hbase.replication.ReplicationException;
051import org.apache.hadoop.hbase.replication.ReplicationFactory;
052import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
053import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
054import org.apache.hadoop.hbase.replication.ReplicationPeers;
055import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.FSUtils;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.hbase.util.Threads;
060import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
061import org.apache.hadoop.mapreduce.InputSplit;
062import org.apache.hadoop.mapreduce.Job;
063import org.apache.hadoop.mapreduce.MRJobConfig;
064import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
065import org.apache.hadoop.util.Tool;
066import org.apache.hadoop.util.ToolRunner;
067import org.apache.yetus.audience.InterfaceAudience;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
072
073/**
074 * This map-only job compares the data from a local table with a remote one.
075 * Every cell is compared and must have exactly the same keys (even timestamp)
076 * as well as same value. It is possible to restrict the job by time range and
077 * families. The peer id that's provided must match the one given when the
078 * replication stream was setup.
079 * <p>
080 * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason
081 * for a why a row is different is shown in the map's log.
082 */
083@InterfaceAudience.Private
084public class VerifyReplication extends Configured implements Tool {
085
086  private static final Logger LOG =
087      LoggerFactory.getLogger(VerifyReplication.class);
088
089  public final static String NAME = "verifyrep";
090  private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
091  long startTime = 0;
092  long endTime = Long.MAX_VALUE;
093  int batch = -1;
094  int versions = -1;
095  String tableName = null;
096  String families = null;
097  String delimiter = "";
098  String peerId = null;
099  String rowPrefixes = null;
100  int sleepMsBeforeReCompare = 0;
101  boolean verbose = false;
102  boolean includeDeletedCells = false;
103  //Source table snapshot name
104  String sourceSnapshotName = null;
105  //Temp location in source cluster to restore source snapshot
106  String sourceSnapshotTmpDir = null;
107  //Peer table snapshot name
108  String peerSnapshotName = null;
109  //Temp location in peer cluster to restore peer snapshot
110  String peerSnapshotTmpDir = null;
111  //Peer cluster Hadoop FS address
112  String peerFSAddress = null;
113  //Peer cluster HBase root dir location
114  String peerHBaseRootAddress = null;
115
116
117  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
118
119  /**
120   * Map-only comparator for 2 tables
121   */
122  public static class Verifier
123      extends TableMapper<ImmutableBytesWritable, Put> {
124
125    public enum Counters {
126      GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS
127    }
128
129    private Connection sourceConnection;
130    private Table sourceTable;
131    private Connection replicatedConnection;
132    private Table replicatedTable;
133    private ResultScanner replicatedScanner;
134    private Result currentCompareRowInPeerTable;
135    private int sleepMsBeforeReCompare;
136    private String delimiter = "";
137    private boolean verbose = false;
138    private int batch = -1;
139
140    /**
141     * Map method that compares every scanned row with the equivalent from
142     * a distant cluster.
143     * @param row  The current table row key.
144     * @param value  The columns.
145     * @param context  The current context.
146     * @throws IOException When something is broken with the data.
147     */
148    @Override
149    public void map(ImmutableBytesWritable row, final Result value,
150                    Context context)
151        throws IOException {
152      if (replicatedScanner == null) {
153        Configuration conf = context.getConfiguration();
154        sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0);
155        delimiter = conf.get(NAME + ".delimiter", "");
156        verbose = conf.getBoolean(NAME +".verbose", false);
157        batch = conf.getInt(NAME + ".batch", -1);
158        final Scan scan = new Scan();
159        if (batch > 0) {
160          scan.setBatch(batch);
161        }
162        scan.setCacheBlocks(false);
163        scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
164        long startTime = conf.getLong(NAME + ".startTime", 0);
165        long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
166        String families = conf.get(NAME + ".families", null);
167        if(families != null) {
168          String[] fams = families.split(",");
169          for(String fam : fams) {
170            scan.addFamily(Bytes.toBytes(fam));
171          }
172        }
173        boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false);
174        scan.setRaw(includeDeletedCells);
175        String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
176        setRowPrefixFilter(scan, rowPrefixes);
177        scan.setTimeRange(startTime, endTime);
178        int versions = conf.getInt(NAME+".versions", -1);
179        LOG.info("Setting number of version inside map as: " + versions);
180        if (versions >= 0) {
181          scan.setMaxVersions(versions);
182        }
183        TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
184        sourceConnection = ConnectionFactory.createConnection(conf);
185        sourceTable = sourceConnection.getTable(tableName);
186
187        final InputSplit tableSplit = context.getInputSplit();
188
189        String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
190        Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
191            zkClusterKey, PEER_CONFIG_PREFIX);
192
193        replicatedConnection = ConnectionFactory.createConnection(peerConf);
194        replicatedTable = replicatedConnection.getTable(tableName);
195        scan.setStartRow(value.getRow());
196
197        byte[] endRow = null;
198        if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) {
199          endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo()
200              .getEndKey();
201        } else {
202          endRow = ((TableSplit) tableSplit).getEndRow();
203        }
204
205        scan.setStopRow(endRow);
206
207        String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null);
208        if (peerSnapshotName != null) {
209          String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null);
210          String peerFSAddress = conf.get(NAME + ".peerFSAddress", null);
211          String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
212          FileSystem.setDefaultUri(peerConf, peerFSAddress);
213          FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
214          LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:"
215              + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf)
216              + " peerFSAddress:" + peerFSAddress);
217
218          replicatedScanner = new TableSnapshotScanner(peerConf, FSUtils.getRootDir(peerConf),
219              new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true);
220        } else {
221          replicatedScanner = replicatedTable.getScanner(scan);
222        }
223        currentCompareRowInPeerTable = replicatedScanner.next();
224      }
225      while (true) {
226        if (currentCompareRowInPeerTable == null) {
227          // reach the region end of peer table, row only in source table
228          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
229          break;
230        }
231        int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
232        if (rowCmpRet == 0) {
233          // rowkey is same, need to compare the content of the row
234          try {
235            Result.compareResults(value, currentCompareRowInPeerTable);
236            context.getCounter(Counters.GOODROWS).increment(1);
237            if (verbose) {
238              LOG.info("Good row key: " + delimiter
239                  + Bytes.toStringBinary(value.getRow()) + delimiter);
240            }
241          } catch (Exception e) {
242            logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
243          }
244          currentCompareRowInPeerTable = replicatedScanner.next();
245          break;
246        } else if (rowCmpRet < 0) {
247          // row only exists in source table
248          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
249          break;
250        } else {
251          // row only exists in peer table
252          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
253            currentCompareRowInPeerTable);
254          currentCompareRowInPeerTable = replicatedScanner.next();
255        }
256      }
257    }
258
259    private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
260      if (sleepMsBeforeReCompare > 0) {
261        Threads.sleep(sleepMsBeforeReCompare);
262        try {
263          Result sourceResult = sourceTable.get(new Get(row.getRow()));
264          Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
265          Result.compareResults(sourceResult, replicatedResult);
266          if (!sourceResult.isEmpty()) {
267            context.getCounter(Counters.GOODROWS).increment(1);
268            if (verbose) {
269              LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow())
270              + delimiter);
271            }
272          }
273          return;
274        } catch (Exception e) {
275          LOG.error("recompare fail after sleep, rowkey=" + delimiter +
276              Bytes.toStringBinary(row.getRow()) + delimiter);
277        }
278      }
279      context.getCounter(counter).increment(1);
280      context.getCounter(Counters.BADROWS).increment(1);
281      LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) +
282          delimiter);
283    }
284
285    @Override
286    protected void cleanup(Context context) {
287      if (replicatedScanner != null) {
288        try {
289          while (currentCompareRowInPeerTable != null) {
290            logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
291              currentCompareRowInPeerTable);
292            currentCompareRowInPeerTable = replicatedScanner.next();
293          }
294        } catch (Exception e) {
295          LOG.error("fail to scan peer table in cleanup", e);
296        } finally {
297          replicatedScanner.close();
298          replicatedScanner = null;
299        }
300      }
301
302      if (sourceTable != null) {
303        try {
304          sourceTable.close();
305        } catch (IOException e) {
306          LOG.error("fail to close source table in cleanup", e);
307        }
308      }
309      if(sourceConnection != null){
310        try {
311          sourceConnection.close();
312        } catch (Exception e) {
313          LOG.error("fail to close source connection in cleanup", e);
314        }
315      }
316
317      if(replicatedTable != null){
318        try{
319          replicatedTable.close();
320        } catch (Exception e) {
321          LOG.error("fail to close replicated table in cleanup", e);
322        }
323      }
324      if(replicatedConnection != null){
325        try {
326          replicatedConnection.close();
327        } catch (Exception e) {
328          LOG.error("fail to close replicated connection in cleanup", e);
329        }
330      }
331    }
332  }
333
334  private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
335      final Configuration conf, String peerId) throws IOException {
336    ZKWatcher localZKW = null;
337    ReplicationPeerZKImpl peer = null;
338    try {
339      localZKW = new ZKWatcher(conf, "VerifyReplication",
340          new Abortable() {
341            @Override public void abort(String why, Throwable e) {}
342            @Override public boolean isAborted() {return false;}
343          });
344
345      ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
346      rp.init();
347
348      Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
349      if (pair == null) {
350        throw new IOException("Couldn't get peer conf!");
351      }
352
353      return pair;
354    } catch (ReplicationException e) {
355      throw new IOException(
356          "An error occurred while trying to connect to the remove peer cluster", e);
357    } finally {
358      if (peer != null) {
359        peer.close();
360      }
361      if (localZKW != null) {
362        localZKW.close();
363      }
364    }
365  }
366
367  private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress)
368      throws IOException {
369    Configuration peerConf =
370        HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
371    FileSystem.setDefaultUri(peerConf, peerFSAddress);
372    FSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress));
373    FileSystem fs = FileSystem.get(peerConf);
374    RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, FSUtils.getRootDir(peerConf),
375      new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName);
376  }
377
378  /**
379   * Sets up the actual job.
380   *
381   * @param conf  The current configuration.
382   * @param args  The command line parameters.
383   * @return The newly created job.
384   * @throws java.io.IOException When setting up the job fails.
385   */
386  public Job createSubmittableJob(Configuration conf, String[] args)
387  throws IOException {
388    if (!doCommandLine(args)) {
389      return null;
390    }
391    conf.set(NAME+".peerId", peerId);
392    conf.set(NAME+".tableName", tableName);
393    conf.setLong(NAME+".startTime", startTime);
394    conf.setLong(NAME+".endTime", endTime);
395    conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
396    conf.set(NAME + ".delimiter", delimiter);
397    conf.setInt(NAME + ".batch", batch);
398    conf.setBoolean(NAME +".verbose", verbose);
399    conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells);
400    if (families != null) {
401      conf.set(NAME+".families", families);
402    }
403    if (rowPrefixes != null){
404      conf.set(NAME+".rowPrefixes", rowPrefixes);
405    }
406
407    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId);
408    ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
409    String peerQuorumAddress = peerConfig.getClusterKey();
410    LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
411        peerConfig.getConfiguration());
412    conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
413    HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
414        peerConfig.getConfiguration().entrySet());
415
416    conf.setInt(NAME + ".versions", versions);
417    LOG.info("Number of version: " + versions);
418
419    //Set Snapshot specific parameters
420    if (peerSnapshotName != null) {
421      conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
422
423      // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to
424      // restore snapshot.
425      Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString());
426      peerSnapshotTmpDir = restoreDir.toString();
427      conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
428
429      conf.set(NAME + ".peerFSAddress", peerFSAddress);
430      conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
431
432      // This is to create HDFS delegation token for peer cluster in case of secured
433      conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress, conf.get(HConstants.HBASE_DIR));
434    }
435
436    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
437    job.setJarByClass(VerifyReplication.class);
438
439    Scan scan = new Scan();
440    scan.setTimeRange(startTime, endTime);
441    scan.setRaw(includeDeletedCells);
442    scan.setCacheBlocks(false);
443    if (batch > 0) {
444      scan.setBatch(batch);
445    }
446    if (versions >= 0) {
447      scan.setMaxVersions(versions);
448      LOG.info("Number of versions set to " + versions);
449    }
450    if(families != null) {
451      String[] fams = families.split(",");
452      for(String fam : fams) {
453        scan.addFamily(Bytes.toBytes(fam));
454      }
455    }
456
457    setRowPrefixFilter(scan, rowPrefixes);
458
459    if (sourceSnapshotName != null) {
460      Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
461      LOG.info(
462        "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
463      TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
464        null, job, true, snapshotTempPath);
465      restoreSnapshotForPeerCluster(conf, peerQuorumAddress);
466    } else {
467      TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
468    }
469    Configuration peerClusterConf = peerConfigPair.getSecond();
470    // Obtain the auth token from peer cluster
471    TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
472
473    job.setOutputFormatClass(NullOutputFormat.class);
474    job.setNumReduceTasks(0);
475    return job;
476  }
477
478  private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
479    if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
480      String[] rowPrefixArray = rowPrefixes.split(",");
481      Arrays.sort(rowPrefixArray);
482      FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
483      for (String prefix : rowPrefixArray) {
484        Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
485        filterList.addFilter(filter);
486      }
487      scan.setFilter(filterList);
488      byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
489      byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
490      setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
491    }
492  }
493
494  private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
495    scan.setStartRow(startPrefixRow);
496    byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
497        new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
498    scan.setStopRow(stopRow);
499  }
500
501  @VisibleForTesting
502  public boolean doCommandLine(final String[] args) {
503    if (args.length < 2) {
504      printUsage(null);
505      return false;
506    }
507    try {
508      for (int i = 0; i < args.length; i++) {
509        String cmd = args[i];
510        if (cmd.equals("-h") || cmd.startsWith("--h")) {
511          printUsage(null);
512          return false;
513        }
514
515        final String startTimeArgKey = "--starttime=";
516        if (cmd.startsWith(startTimeArgKey)) {
517          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
518          continue;
519        }
520
521        final String endTimeArgKey = "--endtime=";
522        if (cmd.startsWith(endTimeArgKey)) {
523          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
524          continue;
525        }
526
527        final String includeDeletedCellsArgKey = "--raw";
528        if (cmd.equals(includeDeletedCellsArgKey)) {
529          includeDeletedCells = true;
530          continue;
531        }
532
533        final String versionsArgKey = "--versions=";
534        if (cmd.startsWith(versionsArgKey)) {
535          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
536          continue;
537        }
538
539        final String batchArgKey = "--batch=";
540        if (cmd.startsWith(batchArgKey)) {
541          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
542          continue;
543        }
544
545        final String familiesArgKey = "--families=";
546        if (cmd.startsWith(familiesArgKey)) {
547          families = cmd.substring(familiesArgKey.length());
548          continue;
549        }
550
551        final String rowPrefixesKey = "--row-prefixes=";
552        if (cmd.startsWith(rowPrefixesKey)){
553          rowPrefixes = cmd.substring(rowPrefixesKey.length());
554          continue;
555        }
556
557        final String delimiterArgKey = "--delimiter=";
558        if (cmd.startsWith(delimiterArgKey)) {
559          delimiter = cmd.substring(delimiterArgKey.length());
560          continue;
561        }
562
563        final String sleepToReCompareKey = "--recomparesleep=";
564        if (cmd.startsWith(sleepToReCompareKey)) {
565          sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
566          continue;
567        }
568        final String verboseKey = "--verbose";
569        if (cmd.startsWith(verboseKey)) {
570          verbose = true;
571          continue;
572        }
573
574        final String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
575        if (cmd.startsWith(sourceSnapshotNameArgKey)) {
576          sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length());
577          continue;
578        }
579
580        final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
581        if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) {
582          sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length());
583          continue;
584        }
585
586        final String peerSnapshotNameArgKey = "--peerSnapshotName=";
587        if (cmd.startsWith(peerSnapshotNameArgKey)) {
588          peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length());
589          continue;
590        }
591
592        final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
593        if (cmd.startsWith(peerSnapshotTmpDirArgKey)) {
594          peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length());
595          continue;
596        }
597
598        final String peerFSAddressArgKey = "--peerFSAddress=";
599        if (cmd.startsWith(peerFSAddressArgKey)) {
600          peerFSAddress = cmd.substring(peerFSAddressArgKey.length());
601          continue;
602        }
603
604        final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
605        if (cmd.startsWith(peerHBaseRootAddressArgKey)) {
606          peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length());
607          continue;
608        }
609
610        if (cmd.startsWith("--")) {
611          printUsage("Invalid argument '" + cmd + "'");
612          return false;
613        }
614
615        if (i == args.length-2) {
616          peerId = cmd;
617        }
618
619        if (i == args.length-1) {
620          tableName = cmd;
621        }
622      }
623
624      if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null)
625          || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) {
626        printUsage("Source snapshot name and snapshot temp location should be provided"
627            + " to use snapshots in source cluster");
628        return false;
629      }
630
631      if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null
632          || peerHBaseRootAddress != null) {
633        if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null
634            || peerHBaseRootAddress == null) {
635          printUsage(
636            "Peer snapshot name, peer snapshot temp location, Peer HBase root address and  "
637                + "peer FSAddress should be provided to use snapshots in peer cluster");
638          return false;
639        }
640      }
641
642      // This is to avoid making recompare calls to source/peer tables when snapshots are used
643      if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
644        printUsage(
645          "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
646        return false;
647      }
648
649    } catch (Exception e) {
650      e.printStackTrace();
651      printUsage("Can't start because " + e.getMessage());
652      return false;
653    }
654    return true;
655  }
656
657  /*
658   * @param errorMsg Error message.  Can be null.
659   */
660  private static void printUsage(final String errorMsg) {
661    if (errorMsg != null && errorMsg.length() > 0) {
662      System.err.println("ERROR: " + errorMsg);
663    }
664    System.err.println("Usage: verifyrep [--starttime=X]" +
665        " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " +
666        "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] "
667            + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U]  <peerid> <tablename>");
668    System.err.println();
669    System.err.println("Options:");
670    System.err.println(" starttime    beginning of the time range");
671    System.err.println("              without endtime means from starttime to forever");
672    System.err.println(" endtime      end of the time range");
673    System.err.println(" versions     number of cell versions to verify");
674    System.err.println(" batch        batch count for scan, " +
675        "note that result row counts will no longer be actual number of rows when you use this option");
676    System.err.println(" raw          includes raw scan if given in options");
677    System.err.println(" families     comma-separated list of families to copy");
678    System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
679    System.err.println(" delimiter    the delimiter used in display around rowkey");
680    System.err.println(" recomparesleep   milliseconds to sleep before recompare row, " +
681        "default value is 0 which disables the recompare.");
682    System.err.println(" verbose      logs row keys of good rows");
683    System.err.println(" sourceSnapshotName  Source Snapshot Name");
684    System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
685    System.err.println(" peerSnapshotName  Peer Snapshot Name");
686    System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
687    System.err.println(" peerFSAddress      Peer cluster Hadoop FS address");
688    System.err.println(" peerHBaseRootAddress  Peer cluster HBase root location");
689    System.err.println();
690    System.err.println("Args:");
691    System.err.println(" peerid       Id of the peer used for verification, must match the one given for replication");
692    System.err.println(" tablename    Name of the table to verify");
693    System.err.println();
694    System.err.println("Examples:");
695    System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
696    System.err.println(" $ hbase " +
697        "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
698        " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
699  }
700
701  @Override
702  public int run(String[] args) throws Exception {
703    Configuration conf = this.getConf();
704    Job job = createSubmittableJob(conf, args);
705    if (job != null) {
706      return job.waitForCompletion(true) ? 0 : 1;
707    }
708    return 1;
709  }
710
711  /**
712   * Main entry point.
713   *
714   * @param args  The command line parameters.
715   * @throws Exception When running the job fails.
716   */
717  public static void main(String[] args) throws Exception {
718    int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
719    System.exit(res);
720  }
721}