001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce.replication;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.UUID;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.conf.Configured;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Abortable;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.ConnectionFactory;
034import org.apache.hadoop.hbase.client.Get;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.ResultScanner;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TableSnapshotScanner;
041import org.apache.hadoop.hbase.filter.Filter;
042import org.apache.hadoop.hbase.filter.FilterList;
043import org.apache.hadoop.hbase.filter.PrefixFilter;
044import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
045import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
046import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
047import org.apache.hadoop.hbase.mapreduce.TableMapper;
048import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
049import org.apache.hadoop.hbase.mapreduce.TableSplit;
050import org.apache.hadoop.hbase.replication.ReplicationException;
051import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
052import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
053import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
054import org.apache.hadoop.hbase.replication.ReplicationUtils;
055import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.CommonFSUtils;
058import org.apache.hadoop.hbase.util.Pair;
059import org.apache.hadoop.hbase.util.Threads;
060import org.apache.hadoop.hbase.zookeeper.ZKConfig;
061import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
062import org.apache.hadoop.mapreduce.InputSplit;
063import org.apache.hadoop.mapreduce.Job;
064import org.apache.hadoop.mapreduce.MRJobConfig;
065import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
066import org.apache.hadoop.util.Tool;
067import org.apache.hadoop.util.ToolRunner;
068import org.apache.yetus.audience.InterfaceAudience;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072/**
073 * This map-only job compares the data from a local table with a remote one.
074 * Every cell is compared and must have exactly the same keys (even timestamp)
075 * as well as same value. It is possible to restrict the job by time range and
076 * families. The peer id that's provided must match the one given when the
077 * replication stream was setup.
078 * <p>
079 * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason
080 * for a why a row is different is shown in the map's log.
081 */
082@InterfaceAudience.Private
083public class VerifyReplication extends Configured implements Tool {
084
085  private static final Logger LOG =
086      LoggerFactory.getLogger(VerifyReplication.class);
087
088  public final static String NAME = "verifyrep";
089  private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
090  long startTime = 0;
091  long endTime = Long.MAX_VALUE;
092  int batch = -1;
093  int versions = -1;
094  String tableName = null;
095  String families = null;
096  String delimiter = "";
097  String peerId = null;
098  String peerQuorumAddress = null;
099  String rowPrefixes = null;
100  int sleepMsBeforeReCompare = 0;
101  boolean verbose = false;
102  boolean includeDeletedCells = false;
103  //Source table snapshot name
104  String sourceSnapshotName = null;
105  //Temp location in source cluster to restore source snapshot
106  String sourceSnapshotTmpDir = null;
107  //Peer table snapshot name
108  String peerSnapshotName = null;
109  //Temp location in peer cluster to restore peer snapshot
110  String peerSnapshotTmpDir = null;
111  //Peer cluster Hadoop FS address
112  String peerFSAddress = null;
113  //Peer cluster HBase root dir location
114  String peerHBaseRootAddress = null;
115  //Peer Table Name
116  String peerTableName = null;
117
118
119  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
120
121  /**
122   * Map-only comparator for 2 tables
123   */
124  public static class Verifier
125      extends TableMapper<ImmutableBytesWritable, Put> {
126
127    public enum Counters {
128      GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS
129    }
130
131    private Connection sourceConnection;
132    private Table sourceTable;
133    private Connection replicatedConnection;
134    private Table replicatedTable;
135    private ResultScanner replicatedScanner;
136    private Result currentCompareRowInPeerTable;
137    private int sleepMsBeforeReCompare;
138    private String delimiter = "";
139    private boolean verbose = false;
140    private int batch = -1;
141
142    /**
143     * Map method that compares every scanned row with the equivalent from
144     * a distant cluster.
145     * @param row  The current table row key.
146     * @param value  The columns.
147     * @param context  The current context.
148     * @throws IOException When something is broken with the data.
149     */
150    @Override
151    public void map(ImmutableBytesWritable row, final Result value,
152                    Context context)
153        throws IOException {
154      if (replicatedScanner == null) {
155        Configuration conf = context.getConfiguration();
156        sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0);
157        delimiter = conf.get(NAME + ".delimiter", "");
158        verbose = conf.getBoolean(NAME +".verbose", false);
159        batch = conf.getInt(NAME + ".batch", -1);
160        final Scan scan = new Scan();
161        if (batch > 0) {
162          scan.setBatch(batch);
163        }
164        scan.setCacheBlocks(false);
165        scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
166        long startTime = conf.getLong(NAME + ".startTime", 0);
167        long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
168        String families = conf.get(NAME + ".families", null);
169        if(families != null) {
170          String[] fams = families.split(",");
171          for(String fam : fams) {
172            scan.addFamily(Bytes.toBytes(fam));
173          }
174        }
175        boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false);
176        scan.setRaw(includeDeletedCells);
177        String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
178        setRowPrefixFilter(scan, rowPrefixes);
179        scan.setTimeRange(startTime, endTime);
180        int versions = conf.getInt(NAME+".versions", -1);
181        LOG.info("Setting number of version inside map as: " + versions);
182        if (versions >= 0) {
183          scan.setMaxVersions(versions);
184        }
185        TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
186        sourceConnection = ConnectionFactory.createConnection(conf);
187        sourceTable = sourceConnection.getTable(tableName);
188
189        final InputSplit tableSplit = context.getInputSplit();
190
191        String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
192        Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
193            zkClusterKey, PEER_CONFIG_PREFIX);
194
195        String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString());
196        TableName peerTableName = TableName.valueOf(peerName);
197        replicatedConnection = ConnectionFactory.createConnection(peerConf);
198        replicatedTable = replicatedConnection.getTable(peerTableName);
199        scan.setStartRow(value.getRow());
200
201        byte[] endRow = null;
202        if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) {
203          endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo()
204              .getEndKey();
205        } else {
206          endRow = ((TableSplit) tableSplit).getEndRow();
207        }
208
209        scan.setStopRow(endRow);
210
211        String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null);
212        if (peerSnapshotName != null) {
213          String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null);
214          String peerFSAddress = conf.get(NAME + ".peerFSAddress", null);
215          String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
216          FileSystem.setDefaultUri(peerConf, peerFSAddress);
217          CommonFSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
218          LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:" +
219            peerSnapshotTmpDir + " peer root uri:" + CommonFSUtils.getRootDir(peerConf) +
220            " peerFSAddress:" + peerFSAddress);
221
222          replicatedScanner = new TableSnapshotScanner(peerConf, CommonFSUtils.getRootDir(peerConf),
223            new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true);
224        } else {
225          replicatedScanner = replicatedTable.getScanner(scan);
226        }
227        currentCompareRowInPeerTable = replicatedScanner.next();
228      }
229      while (true) {
230        if (currentCompareRowInPeerTable == null) {
231          // reach the region end of peer table, row only in source table
232          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
233          break;
234        }
235        int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
236        if (rowCmpRet == 0) {
237          // rowkey is same, need to compare the content of the row
238          try {
239            Result.compareResults(value, currentCompareRowInPeerTable);
240            context.getCounter(Counters.GOODROWS).increment(1);
241            if (verbose) {
242              LOG.info("Good row key: " + delimiter
243                  + Bytes.toStringBinary(value.getRow()) + delimiter);
244            }
245          } catch (Exception e) {
246            logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
247          }
248          currentCompareRowInPeerTable = replicatedScanner.next();
249          break;
250        } else if (rowCmpRet < 0) {
251          // row only exists in source table
252          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
253          break;
254        } else {
255          // row only exists in peer table
256          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
257            currentCompareRowInPeerTable);
258          currentCompareRowInPeerTable = replicatedScanner.next();
259        }
260      }
261    }
262
263    private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
264      if (sleepMsBeforeReCompare > 0) {
265        Threads.sleep(sleepMsBeforeReCompare);
266        try {
267          Result sourceResult = sourceTable.get(new Get(row.getRow()));
268          Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
269          Result.compareResults(sourceResult, replicatedResult);
270          if (!sourceResult.isEmpty()) {
271            context.getCounter(Counters.GOODROWS).increment(1);
272            if (verbose) {
273              LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow())
274              + delimiter);
275            }
276          }
277          return;
278        } catch (Exception e) {
279          LOG.error("recompare fail after sleep, rowkey=" + delimiter +
280              Bytes.toStringBinary(row.getRow()) + delimiter);
281        }
282      }
283      context.getCounter(counter).increment(1);
284      context.getCounter(Counters.BADROWS).increment(1);
285      LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) +
286          delimiter);
287    }
288
289    @Override
290    protected void cleanup(Context context) {
291      if (replicatedScanner != null) {
292        try {
293          while (currentCompareRowInPeerTable != null) {
294            logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
295              currentCompareRowInPeerTable);
296            currentCompareRowInPeerTable = replicatedScanner.next();
297          }
298        } catch (Exception e) {
299          LOG.error("fail to scan peer table in cleanup", e);
300        } finally {
301          replicatedScanner.close();
302          replicatedScanner = null;
303        }
304      }
305
306      if (sourceTable != null) {
307        try {
308          sourceTable.close();
309        } catch (IOException e) {
310          LOG.error("fail to close source table in cleanup", e);
311        }
312      }
313      if(sourceConnection != null){
314        try {
315          sourceConnection.close();
316        } catch (Exception e) {
317          LOG.error("fail to close source connection in cleanup", e);
318        }
319      }
320
321      if(replicatedTable != null){
322        try{
323          replicatedTable.close();
324        } catch (Exception e) {
325          LOG.error("fail to close replicated table in cleanup", e);
326        }
327      }
328      if(replicatedConnection != null){
329        try {
330          replicatedConnection.close();
331        } catch (Exception e) {
332          LOG.error("fail to close replicated connection in cleanup", e);
333        }
334      }
335    }
336  }
337
338  private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
339      final Configuration conf, String peerId) throws IOException {
340    ZKWatcher localZKW = null;
341    try {
342      localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
343        @Override
344        public void abort(String why, Throwable e) {
345        }
346
347        @Override
348        public boolean isAborted() {
349          return false;
350        }
351      });
352      ReplicationPeerStorage storage =
353        ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
354      ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
355      return Pair.newPair(peerConfig,
356        ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));
357    } catch (ReplicationException e) {
358      throw new IOException("An error occurred while trying to connect to the remote peer cluster",
359          e);
360    } finally {
361      if (localZKW != null) {
362        localZKW.close();
363      }
364    }
365  }
366
367  private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress)
368    throws IOException {
369    Configuration peerConf =
370      HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
371    FileSystem.setDefaultUri(peerConf, peerFSAddress);
372    CommonFSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress));
373    FileSystem fs = FileSystem.get(peerConf);
374    RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, CommonFSUtils.getRootDir(peerConf),
375      new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName);
376  }
377
378  /**
379   * Sets up the actual job.
380   *
381   * @param conf  The current configuration.
382   * @param args  The command line parameters.
383   * @return The newly created job.
384   * @throws java.io.IOException When setting up the job fails.
385   */
386  public Job createSubmittableJob(Configuration conf, String[] args)
387  throws IOException {
388    if (!doCommandLine(args)) {
389      return null;
390    }
391    conf.set(NAME+".tableName", tableName);
392    conf.setLong(NAME+".startTime", startTime);
393    conf.setLong(NAME+".endTime", endTime);
394    conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
395    conf.set(NAME + ".delimiter", delimiter);
396    conf.setInt(NAME + ".batch", batch);
397    conf.setBoolean(NAME +".verbose", verbose);
398    conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells);
399    if (families != null) {
400      conf.set(NAME+".families", families);
401    }
402    if (rowPrefixes != null){
403      conf.set(NAME+".rowPrefixes", rowPrefixes);
404    }
405
406    String peerQuorumAddress;
407    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null;
408    if (peerId != null) {
409      peerConfigPair = getPeerQuorumConfig(conf, peerId);
410      ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
411      peerQuorumAddress = peerConfig.getClusterKey();
412      LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
413        peerConfig.getConfiguration());
414      conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
415      HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
416        peerConfig.getConfiguration().entrySet());
417    } else {
418      assert this.peerQuorumAddress != null;
419      peerQuorumAddress = this.peerQuorumAddress;
420      LOG.info("Peer Quorum Address: " + peerQuorumAddress);
421      conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
422    }
423
424    if (peerTableName != null) {
425      LOG.info("Peer Table Name: " + peerTableName);
426      conf.set(NAME + ".peerTableName", peerTableName);
427    }
428
429    conf.setInt(NAME + ".versions", versions);
430    LOG.info("Number of version: " + versions);
431
432    //Set Snapshot specific parameters
433    if (peerSnapshotName != null) {
434      conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
435
436      // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to
437      // restore snapshot.
438      Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString());
439      peerSnapshotTmpDir = restoreDir.toString();
440      conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
441
442      conf.set(NAME + ".peerFSAddress", peerFSAddress);
443      conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
444
445      // This is to create HDFS delegation token for peer cluster in case of secured
446      conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress, conf.get(HConstants.HBASE_DIR));
447    }
448
449    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
450    job.setJarByClass(VerifyReplication.class);
451
452    Scan scan = new Scan();
453    scan.setTimeRange(startTime, endTime);
454    scan.setRaw(includeDeletedCells);
455    scan.setCacheBlocks(false);
456    if (batch > 0) {
457      scan.setBatch(batch);
458    }
459    if (versions >= 0) {
460      scan.setMaxVersions(versions);
461      LOG.info("Number of versions set to " + versions);
462    }
463    if(families != null) {
464      String[] fams = families.split(",");
465      for(String fam : fams) {
466        scan.addFamily(Bytes.toBytes(fam));
467      }
468    }
469
470    setRowPrefixFilter(scan, rowPrefixes);
471
472    if (sourceSnapshotName != null) {
473      Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
474      LOG.info(
475        "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
476      TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
477        null, job, true, snapshotTempPath);
478      restoreSnapshotForPeerCluster(conf, peerQuorumAddress);
479    } else {
480      TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
481    }
482
483    if (peerId != null) {
484      assert peerConfigPair != null;
485      Configuration peerClusterConf = peerConfigPair.getSecond();
486      // Obtain the auth token from peer cluster
487      TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
488    }
489
490    job.setOutputFormatClass(NullOutputFormat.class);
491    job.setNumReduceTasks(0);
492    return job;
493  }
494
495  private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
496    if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
497      String[] rowPrefixArray = rowPrefixes.split(",");
498      Arrays.sort(rowPrefixArray);
499      FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
500      for (String prefix : rowPrefixArray) {
501        Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
502        filterList.addFilter(filter);
503      }
504      scan.setFilter(filterList);
505      byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
506      byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]);
507      setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
508    }
509  }
510
511  private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
512    scan.setStartRow(startPrefixRow);
513    byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
514        new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)});
515    scan.setStopRow(stopRow);
516  }
517
518  public boolean doCommandLine(final String[] args) {
519    if (args.length < 2) {
520      printUsage(null);
521      return false;
522    }
523    try {
524      for (int i = 0; i < args.length; i++) {
525        String cmd = args[i];
526        if (cmd.equals("-h") || cmd.startsWith("--h")) {
527          printUsage(null);
528          return false;
529        }
530
531        final String startTimeArgKey = "--starttime=";
532        if (cmd.startsWith(startTimeArgKey)) {
533          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
534          continue;
535        }
536
537        final String endTimeArgKey = "--endtime=";
538        if (cmd.startsWith(endTimeArgKey)) {
539          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
540          continue;
541        }
542
543        final String includeDeletedCellsArgKey = "--raw";
544        if (cmd.equals(includeDeletedCellsArgKey)) {
545          includeDeletedCells = true;
546          continue;
547        }
548
549        final String versionsArgKey = "--versions=";
550        if (cmd.startsWith(versionsArgKey)) {
551          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
552          continue;
553        }
554
555        final String batchArgKey = "--batch=";
556        if (cmd.startsWith(batchArgKey)) {
557          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
558          continue;
559        }
560
561        final String familiesArgKey = "--families=";
562        if (cmd.startsWith(familiesArgKey)) {
563          families = cmd.substring(familiesArgKey.length());
564          continue;
565        }
566
567        final String rowPrefixesKey = "--row-prefixes=";
568        if (cmd.startsWith(rowPrefixesKey)){
569          rowPrefixes = cmd.substring(rowPrefixesKey.length());
570          continue;
571        }
572
573        final String delimiterArgKey = "--delimiter=";
574        if (cmd.startsWith(delimiterArgKey)) {
575          delimiter = cmd.substring(delimiterArgKey.length());
576          continue;
577        }
578
579        final String sleepToReCompareKey = "--recomparesleep=";
580        if (cmd.startsWith(sleepToReCompareKey)) {
581          sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
582          continue;
583        }
584        final String verboseKey = "--verbose";
585        if (cmd.startsWith(verboseKey)) {
586          verbose = true;
587          continue;
588        }
589
590        final String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
591        if (cmd.startsWith(sourceSnapshotNameArgKey)) {
592          sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length());
593          continue;
594        }
595
596        final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
597        if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) {
598          sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length());
599          continue;
600        }
601
602        final String peerSnapshotNameArgKey = "--peerSnapshotName=";
603        if (cmd.startsWith(peerSnapshotNameArgKey)) {
604          peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length());
605          continue;
606        }
607
608        final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
609        if (cmd.startsWith(peerSnapshotTmpDirArgKey)) {
610          peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length());
611          continue;
612        }
613
614        final String peerFSAddressArgKey = "--peerFSAddress=";
615        if (cmd.startsWith(peerFSAddressArgKey)) {
616          peerFSAddress = cmd.substring(peerFSAddressArgKey.length());
617          continue;
618        }
619
620        final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
621        if (cmd.startsWith(peerHBaseRootAddressArgKey)) {
622          peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length());
623          continue;
624        }
625
626        final String peerTableNameArgKey = "--peerTableName=";
627        if (cmd.startsWith(peerTableNameArgKey)) {
628          peerTableName = cmd.substring(peerTableNameArgKey.length());
629          continue;
630        }
631
632        if (cmd.startsWith("--")) {
633          printUsage("Invalid argument '" + cmd + "'");
634          return false;
635        }
636
637        if (i == args.length-2) {
638          if (isPeerQuorumAddress(cmd)) {
639            peerQuorumAddress = cmd;
640          } else {
641            peerId = cmd;
642          }
643        }
644
645        if (i == args.length-1) {
646          tableName = cmd;
647        }
648      }
649
650      if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null)
651          || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) {
652        printUsage("Source snapshot name and snapshot temp location should be provided"
653            + " to use snapshots in source cluster");
654        return false;
655      }
656
657      if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null
658          || peerHBaseRootAddress != null) {
659        if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null
660            || peerHBaseRootAddress == null) {
661          printUsage(
662            "Peer snapshot name, peer snapshot temp location, Peer HBase root address and  "
663                + "peer FSAddress should be provided to use snapshots in peer cluster");
664          return false;
665        }
666      }
667
668      // This is to avoid making recompare calls to source/peer tables when snapshots are used
669      if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
670        printUsage(
671          "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
672        return false;
673      }
674
675    } catch (Exception e) {
676      e.printStackTrace();
677      printUsage("Can't start because " + e.getMessage());
678      return false;
679    }
680    return true;
681  }
682
683  private boolean isPeerQuorumAddress(String cmd) {
684    try {
685      ZKConfig.validateClusterKey(cmd);
686    } catch (IOException e) {
687      // not a quorum address
688      return false;
689    }
690    return true;
691  }
692
693  /*
694   * @param errorMsg Error message.  Can be null.
695   */
696  private static void printUsage(final String errorMsg) {
697    if (errorMsg != null && errorMsg.length() > 0) {
698      System.err.println("ERROR: " + errorMsg);
699    }
700    System.err.println("Usage: verifyrep [--starttime=X]"
701        + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] "
702        + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] "
703        + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] "
704        + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
705    System.err.println();
706    System.err.println("Options:");
707    System.err.println(" starttime    beginning of the time range");
708    System.err.println("              without endtime means from starttime to forever");
709    System.err.println(" endtime      end of the time range");
710    System.err.println(" versions     number of cell versions to verify");
711    System.err.println(" batch        batch count for scan, " +
712        "note that result row counts will no longer be actual number of rows when you use this option");
713    System.err.println(" raw          includes raw scan if given in options");
714    System.err.println(" families     comma-separated list of families to copy");
715    System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
716    System.err.println(" delimiter    the delimiter used in display around rowkey");
717    System.err.println(" recomparesleep   milliseconds to sleep before recompare row, " +
718        "default value is 0 which disables the recompare.");
719    System.err.println(" verbose      logs row keys of good rows");
720    System.err.println(" peerTableName  Peer Table Name");
721    System.err.println(" sourceSnapshotName  Source Snapshot Name");
722    System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
723    System.err.println(" peerSnapshotName  Peer Snapshot Name");
724    System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
725    System.err.println(" peerFSAddress      Peer cluster Hadoop FS address");
726    System.err.println(" peerHBaseRootAddress  Peer cluster HBase root location");
727    System.err.println();
728    System.err.println("Args:");
729    System.err.println(" peerid       Id of the peer used for verification, must match the one given for replication");
730    System.err.println(" peerQuorumAddress   quorumAdress of the peer used for verification. The "
731      + "format is zk_quorum:zk_port:zk_hbase_path");
732    System.err.println(" tablename    Name of the table to verify");
733    System.err.println();
734    System.err.println("Examples:");
735    System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
736    System.err.println(" $ hbase " +
737        "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
738        " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
739  }
740
741  @Override
742  public int run(String[] args) throws Exception {
743    Configuration conf = this.getConf();
744    Job job = createSubmittableJob(conf, args);
745    if (job != null) {
746      return job.waitForCompletion(true) ? 0 : 1;
747    }
748    return 1;
749  }
750
751  /**
752   * Main entry point.
753   *
754   * @param args  The command line parameters.
755   * @throws Exception When running the job fails.
756   */
757  public static void main(String[] args) throws Exception {
758    int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
759    System.exit(res);
760  }
761}