001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce.replication;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.UUID;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.conf.Configured;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Abortable;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.Get;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.ResultScanner;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TableSnapshotScanner;
041import org.apache.hadoop.hbase.filter.Filter;
042import org.apache.hadoop.hbase.filter.FilterList;
043import org.apache.hadoop.hbase.filter.PrefixFilter;
044import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
045import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
046import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
047import org.apache.hadoop.hbase.mapreduce.TableMapper;
048import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
049import org.apache.hadoop.hbase.mapreduce.TableSplit;
050import org.apache.hadoop.hbase.replication.ReplicationException;
051import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
052import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
053import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
054import org.apache.hadoop.hbase.replication.ReplicationUtils;
055import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.FSUtils;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.hbase.util.Threads;
060import org.apache.hadoop.hbase.zookeeper.ZKConfig;
061import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
062import org.apache.hadoop.mapreduce.InputSplit;
063import org.apache.hadoop.mapreduce.Job;
064import org.apache.hadoop.mapreduce.MRJobConfig;
065import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
066import org.apache.hadoop.util.Tool;
067import org.apache.hadoop.util.ToolRunner;
068import org.apache.yetus.audience.InterfaceAudience;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
073
074/**
075 * This map-only job compares the data from a local table with a remote one.
076 * Every cell is compared and must have exactly the same keys (even timestamp)
077 * as well as same value. It is possible to restrict the job by time range and
078 * families. The peer id that's provided must match the one given when the
079 * replication stream was setup.
080 * <p>
081 * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason
082 * for a why a row is different is shown in the map's log.
083 */
084@InterfaceAudience.Private
085public class VerifyReplication extends Configured implements Tool {
086
087  private static final Logger LOG =
088      LoggerFactory.getLogger(VerifyReplication.class);
089
090  public final static String NAME = "verifyrep";
091  private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
092  long startTime = 0;
093  long endTime = Long.MAX_VALUE;
094  int batch = -1;
095  int versions = -1;
096  String tableName = null;
097  String families = null;
098  String delimiter = "";
099  String peerId = null;
100  String peerQuorumAddress = null;
101  String rowPrefixes = null;
102  int sleepMsBeforeReCompare = 0;
103  boolean verbose = false;
104  boolean includeDeletedCells = false;
105  //Source table snapshot name
106  String sourceSnapshotName = null;
107  //Temp location in source cluster to restore source snapshot
108  String sourceSnapshotTmpDir = null;
109  //Peer table snapshot name
110  String peerSnapshotName = null;
111  //Temp location in peer cluster to restore peer snapshot
112  String peerSnapshotTmpDir = null;
113  //Peer cluster Hadoop FS address
114  String peerFSAddress = null;
115  //Peer cluster HBase root dir location
116  String peerHBaseRootAddress = null;
117  //Peer Table Name
118  String peerTableName = null;
119
120
121  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
122
123  /**
124   * Map-only comparator for 2 tables
125   */
126  public static class Verifier
127      extends TableMapper<ImmutableBytesWritable, Put> {
128
129    public enum Counters {
130      GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS
131    }
132
133    private Connection sourceConnection;
134    private Table sourceTable;
135    private Connection replicatedConnection;
136    private Table replicatedTable;
137    private ResultScanner replicatedScanner;
138    private Result currentCompareRowInPeerTable;
139    private int sleepMsBeforeReCompare;
140    private String delimiter = "";
141    private boolean verbose = false;
142    private int batch = -1;
143
144    /**
145     * Map method that compares every scanned row with the equivalent from
146     * a distant cluster.
147     * @param row  The current table row key.
148     * @param value  The columns.
149     * @param context  The current context.
150     * @throws IOException When something is broken with the data.
151     */
152    @Override
153    public void map(ImmutableBytesWritable row, final Result value,
154                    Context context)
155        throws IOException {
156      if (replicatedScanner == null) {
157        Configuration conf = context.getConfiguration();
158        sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0);
159        delimiter = conf.get(NAME + ".delimiter", "");
160        verbose = conf.getBoolean(NAME +".verbose", false);
161        batch = conf.getInt(NAME + ".batch", -1);
162        final Scan scan = new Scan();
163        if (batch > 0) {
164          scan.setBatch(batch);
165        }
166        scan.setCacheBlocks(false);
167        scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
168        long startTime = conf.getLong(NAME + ".startTime", 0);
169        long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
170        String families = conf.get(NAME + ".families", null);
171        if(families != null) {
172          String[] fams = families.split(",");
173          for(String fam : fams) {
174            scan.addFamily(Bytes.toBytes(fam));
175          }
176        }
177        boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false);
178        scan.setRaw(includeDeletedCells);
179        String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
180        setRowPrefixFilter(scan, rowPrefixes);
181        scan.setTimeRange(startTime, endTime);
182        int versions = conf.getInt(NAME+".versions", -1);
183        LOG.info("Setting number of version inside map as: " + versions);
184        if (versions >= 0) {
185          scan.setMaxVersions(versions);
186        }
187        TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
188        sourceConnection = ConnectionFactory.createConnection(conf);
189        sourceTable = sourceConnection.getTable(tableName);
190
191        final InputSplit tableSplit = context.getInputSplit();
192
193        String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
194        Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
195            zkClusterKey, PEER_CONFIG_PREFIX);
196
197        String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString());
198        TableName peerTableName = TableName.valueOf(peerName);
199        replicatedConnection = ConnectionFactory.createConnection(peerConf);
200        replicatedTable = replicatedConnection.getTable(peerTableName);
201        scan.setStartRow(value.getRow());
202
203        byte[] endRow = null;
204        if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) {
205          endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo()
206              .getEndKey();
207        } else {
208          endRow = ((TableSplit) tableSplit).getEndRow();
209        }
210
211        scan.setStopRow(endRow);
212
213        String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null);
214        if (peerSnapshotName != null) {
215          String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null);
216          String peerFSAddress = conf.get(NAME + ".peerFSAddress", null);
217          String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
218          FileSystem.setDefaultUri(peerConf, peerFSAddress);
219          FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
220          LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:"
221              + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf)
222              + " peerFSAddress:" + peerFSAddress);
223
224          replicatedScanner = new TableSnapshotScanner(peerConf, FSUtils.getRootDir(peerConf),
225              new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true);
226        } else {
227          replicatedScanner = replicatedTable.getScanner(scan);
228        }
229        currentCompareRowInPeerTable = replicatedScanner.next();
230      }
231      while (true) {
232        if (currentCompareRowInPeerTable == null) {
233          // reach the region end of peer table, row only in source table
234          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
235          break;
236        }
237        int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
238        if (rowCmpRet == 0) {
239          // rowkey is same, need to compare the content of the row
240          try {
241            Result.compareResults(value, currentCompareRowInPeerTable);
242            context.getCounter(Counters.GOODROWS).increment(1);
243            if (verbose) {
244              LOG.info("Good row key: " + delimiter
245                  + Bytes.toStringBinary(value.getRow()) + delimiter);
246            }
247          } catch (Exception e) {
248            logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
249          }
250          currentCompareRowInPeerTable = replicatedScanner.next();
251          break;
252        } else if (rowCmpRet < 0) {
253          // row only exists in source table
254          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
255          break;
256        } else {
257          // row only exists in peer table
258          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
259            currentCompareRowInPeerTable);
260          currentCompareRowInPeerTable = replicatedScanner.next();
261        }
262      }
263    }
264
265    private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
266      if (sleepMsBeforeReCompare > 0) {
267        Threads.sleep(sleepMsBeforeReCompare);
268        try {
269          Result sourceResult = sourceTable.get(new Get(row.getRow()));
270          Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
271          Result.compareResults(sourceResult, replicatedResult);
272          if (!sourceResult.isEmpty()) {
273            context.getCounter(Counters.GOODROWS).increment(1);
274            if (verbose) {
275              LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow())
276              + delimiter);
277            }
278          }
279          return;
280        } catch (Exception e) {
281          LOG.error("recompare fail after sleep, rowkey=" + delimiter +
282              Bytes.toStringBinary(row.getRow()) + delimiter);
283        }
284      }
285      context.getCounter(counter).increment(1);
286      context.getCounter(Counters.BADROWS).increment(1);
287      LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) +
288          delimiter);
289    }
290
291    @Override
292    protected void cleanup(Context context) {
293      if (replicatedScanner != null) {
294        try {
295          while (currentCompareRowInPeerTable != null) {
296            logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
297              currentCompareRowInPeerTable);
298            currentCompareRowInPeerTable = replicatedScanner.next();
299          }
300        } catch (Exception e) {
301          LOG.error("fail to scan peer table in cleanup", e);
302        } finally {
303          replicatedScanner.close();
304          replicatedScanner = null;
305        }
306      }
307
308      if (sourceTable != null) {
309        try {
310          sourceTable.close();
311        } catch (IOException e) {
312          LOG.error("fail to close source table in cleanup", e);
313        }
314      }
315      if(sourceConnection != null){
316        try {
317          sourceConnection.close();
318        } catch (Exception e) {
319          LOG.error("fail to close source connection in cleanup", e);
320        }
321      }
322
323      if(replicatedTable != null){
324        try{
325          replicatedTable.close();
326        } catch (Exception e) {
327          LOG.error("fail to close replicated table in cleanup", e);
328        }
329      }
330      if(replicatedConnection != null){
331        try {
332          replicatedConnection.close();
333        } catch (Exception e) {
334          LOG.error("fail to close replicated connection in cleanup", e);
335        }
336      }
337    }
338  }
339
340  private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
341      final Configuration conf, String peerId) throws IOException {
342    ZKWatcher localZKW = null;
343    try {
344      localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
345        @Override
346        public void abort(String why, Throwable e) {
347        }
348
349        @Override
350        public boolean isAborted() {
351          return false;
352        }
353      });
354      ReplicationPeerStorage storage =
355        ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
356      ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
357      return Pair.newPair(peerConfig,
358        ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));
359    } catch (ReplicationException e) {
360      throw new IOException("An error occurred while trying to connect to the remote peer cluster",
361          e);
362    } finally {
363      if (localZKW != null) {
364        localZKW.close();
365      }
366    }
367  }
368
369  private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress)
370      throws IOException {
371    Configuration peerConf =
372        HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
373    FileSystem.setDefaultUri(peerConf, peerFSAddress);
374    FSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress));
375    FileSystem fs = FileSystem.get(peerConf);
376    RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, FSUtils.getRootDir(peerConf),
377      new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName);
378  }
379
380  /**
381   * Sets up the actual job.
382   *
383   * @param conf  The current configuration.
384   * @param args  The command line parameters.
385   * @return The newly created job.
386   * @throws java.io.IOException When setting up the job fails.
387   */
388  public Job createSubmittableJob(Configuration conf, String[] args)
389  throws IOException {
390    if (!doCommandLine(args)) {
391      return null;
392    }
393    conf.set(NAME+".tableName", tableName);
394    conf.setLong(NAME+".startTime", startTime);
395    conf.setLong(NAME+".endTime", endTime);
396    conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
397    conf.set(NAME + ".delimiter", delimiter);
398    conf.setInt(NAME + ".batch", batch);
399    conf.setBoolean(NAME +".verbose", verbose);
400    conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells);
401    if (families != null) {
402      conf.set(NAME+".families", families);
403    }
404    if (rowPrefixes != null){
405      conf.set(NAME+".rowPrefixes", rowPrefixes);
406    }
407
408    String peerQuorumAddress;
409    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null;
410    if (peerId != null) {
411      peerConfigPair = getPeerQuorumConfig(conf, peerId);
412      ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
413      peerQuorumAddress = peerConfig.getClusterKey();
414      LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
415        peerConfig.getConfiguration());
416      conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
417      HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
418        peerConfig.getConfiguration().entrySet());
419    } else {
420      assert this.peerQuorumAddress != null;
421      peerQuorumAddress = this.peerQuorumAddress;
422      LOG.info("Peer Quorum Address: " + peerQuorumAddress);
423      conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
424    }
425
426    if (peerTableName != null) {
427      LOG.info("Peer Table Name: " + peerTableName);
428      conf.set(NAME + ".peerTableName", peerTableName);
429    }
430
431    conf.setInt(NAME + ".versions", versions);
432    LOG.info("Number of version: " + versions);
433
434    //Set Snapshot specific parameters
435    if (peerSnapshotName != null) {
436      conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
437
438      // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to
439      // restore snapshot.
440      Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString());
441      peerSnapshotTmpDir = restoreDir.toString();
442      conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
443
444      conf.set(NAME + ".peerFSAddress", peerFSAddress);
445      conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
446
447      // This is to create HDFS delegation token for peer cluster in case of secured
448      conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress, conf.get(HConstants.HBASE_DIR));
449    }
450
451    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
452    job.setJarByClass(VerifyReplication.class);
453
454    Scan scan = new Scan();
455    scan.setTimeRange(startTime, endTime);
456    scan.setRaw(includeDeletedCells);
457    scan.setCacheBlocks(false);
458    if (batch > 0) {
459      scan.setBatch(batch);
460    }
461    if (versions >= 0) {
462      scan.setMaxVersions(versions);
463      LOG.info("Number of versions set to " + versions);
464    }
465    if(families != null) {
466      String[] fams = families.split(",");
467      for(String fam : fams) {
468        scan.addFamily(Bytes.toBytes(fam));
469      }
470    }
471
472    setRowPrefixFilter(scan, rowPrefixes);
473
474    if (sourceSnapshotName != null) {
475      Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
476      LOG.info(
477        "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
478      TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
479        null, job, true, snapshotTempPath);
480      restoreSnapshotForPeerCluster(conf, peerQuorumAddress);
481    } else {
482      TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
483    }
484
485    if (peerId != null) {
486      assert peerConfigPair != null;
487      Configuration peerClusterConf = peerConfigPair.getSecond();
488      // Obtain the auth token from peer cluster
489      TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
490    }
491
492    job.setOutputFormatClass(NullOutputFormat.class);
493    job.setNumReduceTasks(0);
494    return job;
495  }
496
497  private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
498    if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
499      String[] rowPrefixArray = rowPrefixes.split(",");
500      Arrays.sort(rowPrefixArray);
501      FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
502      for (String prefix : rowPrefixArray) {
503        Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
504        filterList.addFilter(filter);
505      }
506      scan.setFilter(filterList);
507      byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
508      byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
509      setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
510    }
511  }
512
513  private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
514    scan.setStartRow(startPrefixRow);
515    byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
516        new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
517    scan.setStopRow(stopRow);
518  }
519
520  @VisibleForTesting
521  public boolean doCommandLine(final String[] args) {
522    if (args.length < 2) {
523      printUsage(null);
524      return false;
525    }
526    try {
527      for (int i = 0; i < args.length; i++) {
528        String cmd = args[i];
529        if (cmd.equals("-h") || cmd.startsWith("--h")) {
530          printUsage(null);
531          return false;
532        }
533
534        final String startTimeArgKey = "--starttime=";
535        if (cmd.startsWith(startTimeArgKey)) {
536          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
537          continue;
538        }
539
540        final String endTimeArgKey = "--endtime=";
541        if (cmd.startsWith(endTimeArgKey)) {
542          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
543          continue;
544        }
545
546        final String includeDeletedCellsArgKey = "--raw";
547        if (cmd.equals(includeDeletedCellsArgKey)) {
548          includeDeletedCells = true;
549          continue;
550        }
551
552        final String versionsArgKey = "--versions=";
553        if (cmd.startsWith(versionsArgKey)) {
554          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
555          continue;
556        }
557
558        final String batchArgKey = "--batch=";
559        if (cmd.startsWith(batchArgKey)) {
560          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
561          continue;
562        }
563
564        final String familiesArgKey = "--families=";
565        if (cmd.startsWith(familiesArgKey)) {
566          families = cmd.substring(familiesArgKey.length());
567          continue;
568        }
569
570        final String rowPrefixesKey = "--row-prefixes=";
571        if (cmd.startsWith(rowPrefixesKey)){
572          rowPrefixes = cmd.substring(rowPrefixesKey.length());
573          continue;
574        }
575
576        final String delimiterArgKey = "--delimiter=";
577        if (cmd.startsWith(delimiterArgKey)) {
578          delimiter = cmd.substring(delimiterArgKey.length());
579          continue;
580        }
581
582        final String sleepToReCompareKey = "--recomparesleep=";
583        if (cmd.startsWith(sleepToReCompareKey)) {
584          sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
585          continue;
586        }
587        final String verboseKey = "--verbose";
588        if (cmd.startsWith(verboseKey)) {
589          verbose = true;
590          continue;
591        }
592
593        final String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
594        if (cmd.startsWith(sourceSnapshotNameArgKey)) {
595          sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length());
596          continue;
597        }
598
599        final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
600        if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) {
601          sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length());
602          continue;
603        }
604
605        final String peerSnapshotNameArgKey = "--peerSnapshotName=";
606        if (cmd.startsWith(peerSnapshotNameArgKey)) {
607          peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length());
608          continue;
609        }
610
611        final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
612        if (cmd.startsWith(peerSnapshotTmpDirArgKey)) {
613          peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length());
614          continue;
615        }
616
617        final String peerFSAddressArgKey = "--peerFSAddress=";
618        if (cmd.startsWith(peerFSAddressArgKey)) {
619          peerFSAddress = cmd.substring(peerFSAddressArgKey.length());
620          continue;
621        }
622
623        final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
624        if (cmd.startsWith(peerHBaseRootAddressArgKey)) {
625          peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length());
626          continue;
627        }
628
629        final String peerTableNameArgKey = "--peerTableName=";
630        if (cmd.startsWith(peerTableNameArgKey)) {
631          peerTableName = cmd.substring(peerTableNameArgKey.length());
632          continue;
633        }
634
635        if (cmd.startsWith("--")) {
636          printUsage("Invalid argument '" + cmd + "'");
637          return false;
638        }
639
640        if (i == args.length-2) {
641          if (isPeerQuorumAddress(cmd)) {
642            peerQuorumAddress = cmd;
643          } else {
644            peerId = cmd;
645          }
646        }
647
648        if (i == args.length-1) {
649          tableName = cmd;
650        }
651      }
652
653      if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null)
654          || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) {
655        printUsage("Source snapshot name and snapshot temp location should be provided"
656            + " to use snapshots in source cluster");
657        return false;
658      }
659
660      if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null
661          || peerHBaseRootAddress != null) {
662        if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null
663            || peerHBaseRootAddress == null) {
664          printUsage(
665            "Peer snapshot name, peer snapshot temp location, Peer HBase root address and  "
666                + "peer FSAddress should be provided to use snapshots in peer cluster");
667          return false;
668        }
669      }
670
671      // This is to avoid making recompare calls to source/peer tables when snapshots are used
672      if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
673        printUsage(
674          "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
675        return false;
676      }
677
678    } catch (Exception e) {
679      e.printStackTrace();
680      printUsage("Can't start because " + e.getMessage());
681      return false;
682    }
683    return true;
684  }
685
686  private boolean isPeerQuorumAddress(String cmd) {
687    try {
688      ZKConfig.validateClusterKey(cmd);
689    } catch (IOException e) {
690      // not a quorum address
691      return false;
692    }
693    return true;
694  }
695
696  /*
697   * @param errorMsg Error message.  Can be null.
698   */
699  private static void printUsage(final String errorMsg) {
700    if (errorMsg != null && errorMsg.length() > 0) {
701      System.err.println("ERROR: " + errorMsg);
702    }
703    System.err.println("Usage: verifyrep [--starttime=X]"
704        + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] "
705        + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] "
706        + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] "
707        + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
708    System.err.println();
709    System.err.println("Options:");
710    System.err.println(" starttime    beginning of the time range");
711    System.err.println("              without endtime means from starttime to forever");
712    System.err.println(" endtime      end of the time range");
713    System.err.println(" versions     number of cell versions to verify");
714    System.err.println(" batch        batch count for scan, " +
715        "note that result row counts will no longer be actual number of rows when you use this option");
716    System.err.println(" raw          includes raw scan if given in options");
717    System.err.println(" families     comma-separated list of families to copy");
718    System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
719    System.err.println(" delimiter    the delimiter used in display around rowkey");
720    System.err.println(" recomparesleep   milliseconds to sleep before recompare row, " +
721        "default value is 0 which disables the recompare.");
722    System.err.println(" verbose      logs row keys of good rows");
723    System.err.println(" peerTableName  Peer Table Name");
724    System.err.println(" sourceSnapshotName  Source Snapshot Name");
725    System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
726    System.err.println(" peerSnapshotName  Peer Snapshot Name");
727    System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
728    System.err.println(" peerFSAddress      Peer cluster Hadoop FS address");
729    System.err.println(" peerHBaseRootAddress  Peer cluster HBase root location");
730    System.err.println();
731    System.err.println("Args:");
732    System.err.println(" peerid       Id of the peer used for verification, must match the one given for replication");
733    System.err.println(" peerQuorumAddress   quorumAdress of the peer used for verification. The "
734      + "format is zk_quorum:zk_port:zk_hbase_path");
735    System.err.println(" tablename    Name of the table to verify");
736    System.err.println();
737    System.err.println("Examples:");
738    System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
739    System.err.println(" $ hbase " +
740        "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
741        " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
742  }
743
744  @Override
745  public int run(String[] args) throws Exception {
746    Configuration conf = this.getConf();
747    Job job = createSubmittableJob(conf, args);
748    if (job != null) {
749      return job.waitForCompletion(true) ? 0 : 1;
750    }
751    return 1;
752  }
753
754  /**
755   * Main entry point.
756   *
757   * @param args  The command line parameters.
758   * @throws Exception When running the job fails.
759   */
760  public static void main(String[] args) throws Exception {
761    int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
762    System.exit(res);
763  }
764}