001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce.replication;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.UUID;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.conf.Configured;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.ConnectionFactory;
033import org.apache.hadoop.hbase.client.Get;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.ResultScanner;
037import org.apache.hadoop.hbase.client.Scan;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.TableSnapshotScanner;
040import org.apache.hadoop.hbase.filter.Filter;
041import org.apache.hadoop.hbase.filter.FilterList;
042import org.apache.hadoop.hbase.filter.PrefixFilter;
043import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
044import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
045import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
046import org.apache.hadoop.hbase.mapreduce.TableMapper;
047import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
048import org.apache.hadoop.hbase.mapreduce.TableSplit;
049import org.apache.hadoop.hbase.replication.ReplicationException;
050import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
051import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
052import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
053import org.apache.hadoop.hbase.replication.ReplicationUtils;
054import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.CommonFSUtils;
057import org.apache.hadoop.hbase.util.Pair;
058import org.apache.hadoop.hbase.util.Threads;
059import org.apache.hadoop.hbase.zookeeper.ZKConfig;
060import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
061import org.apache.hadoop.mapreduce.InputSplit;
062import org.apache.hadoop.mapreduce.Job;
063import org.apache.hadoop.mapreduce.MRJobConfig;
064import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
065import org.apache.hadoop.util.Tool;
066import org.apache.hadoop.util.ToolRunner;
067import org.apache.yetus.audience.InterfaceAudience;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071/**
072 * This map-only job compares the data from a local table with a remote one. Every cell is compared
073 * and must have exactly the same keys (even timestamp) as well as same value. It is possible to
074 * restrict the job by time range and families. The peer id that's provided must match the one given
075 * when the replication stream was setup.
076 * <p>
077 * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason for a why a row is
078 * different is shown in the map's log.
079 */
080@InterfaceAudience.Private
081public class VerifyReplication extends Configured implements Tool {
082
083  private static final Logger LOG = LoggerFactory.getLogger(VerifyReplication.class);
084
085  public final static String NAME = "verifyrep";
086  private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
087  long startTime = 0;
088  long endTime = Long.MAX_VALUE;
089  int batch = -1;
090  int versions = -1;
091  String tableName = null;
092  String families = null;
093  String delimiter = "";
094  String peerId = null;
095  String peerQuorumAddress = null;
096  String rowPrefixes = null;
097  int sleepMsBeforeReCompare = 0;
098  boolean verbose = false;
099  boolean includeDeletedCells = false;
100  // Source table snapshot name
101  String sourceSnapshotName = null;
102  // Temp location in source cluster to restore source snapshot
103  String sourceSnapshotTmpDir = null;
104  // Peer table snapshot name
105  String peerSnapshotName = null;
106  // Temp location in peer cluster to restore peer snapshot
107  String peerSnapshotTmpDir = null;
108  // Peer cluster Hadoop FS address
109  String peerFSAddress = null;
110  // Peer cluster HBase root dir location
111  String peerHBaseRootAddress = null;
112  // Peer Table Name
113  String peerTableName = null;
114
115  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
116
117  /**
118   * Map-only comparator for 2 tables
119   */
120  public static class Verifier extends TableMapper<ImmutableBytesWritable, Put> {
121
122    public enum Counters {
123      GOODROWS,
124      BADROWS,
125      ONLY_IN_SOURCE_TABLE_ROWS,
126      ONLY_IN_PEER_TABLE_ROWS,
127      CONTENT_DIFFERENT_ROWS
128    }
129
130    private Connection sourceConnection;
131    private Table sourceTable;
132    private Connection replicatedConnection;
133    private Table replicatedTable;
134    private ResultScanner replicatedScanner;
135    private Result currentCompareRowInPeerTable;
136    private int sleepMsBeforeReCompare;
137    private String delimiter = "";
138    private boolean verbose = false;
139    private int batch = -1;
140
141    /**
142     * Map method that compares every scanned row with the equivalent from a distant cluster.
143     * @param row     The current table row key.
144     * @param value   The columns.
145     * @param context The current context.
146     * @throws IOException When something is broken with the data.
147     */
148    @Override
149    public void map(ImmutableBytesWritable row, final Result value, Context context)
150      throws IOException {
151      if (replicatedScanner == null) {
152        Configuration conf = context.getConfiguration();
153        sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0);
154        delimiter = conf.get(NAME + ".delimiter", "");
155        verbose = conf.getBoolean(NAME + ".verbose", false);
156        batch = conf.getInt(NAME + ".batch", -1);
157        final Scan scan = new Scan();
158        if (batch > 0) {
159          scan.setBatch(batch);
160        }
161        scan.setCacheBlocks(false);
162        scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
163        long startTime = conf.getLong(NAME + ".startTime", 0);
164        long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
165        String families = conf.get(NAME + ".families", null);
166        if (families != null) {
167          String[] fams = families.split(",");
168          for (String fam : fams) {
169            scan.addFamily(Bytes.toBytes(fam));
170          }
171        }
172        boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false);
173        scan.setRaw(includeDeletedCells);
174        String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
175        setRowPrefixFilter(scan, rowPrefixes);
176        scan.setTimeRange(startTime, endTime);
177        int versions = conf.getInt(NAME + ".versions", -1);
178        LOG.info("Setting number of version inside map as: " + versions);
179        if (versions >= 0) {
180          scan.setMaxVersions(versions);
181        }
182        TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
183        sourceConnection = ConnectionFactory.createConnection(conf);
184        sourceTable = sourceConnection.getTable(tableName);
185
186        final InputSplit tableSplit = context.getInputSplit();
187
188        String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
189        Configuration peerConf =
190          HBaseConfiguration.createClusterConf(conf, zkClusterKey, PEER_CONFIG_PREFIX);
191
192        String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString());
193        TableName peerTableName = TableName.valueOf(peerName);
194        replicatedConnection = ConnectionFactory.createConnection(peerConf);
195        replicatedTable = replicatedConnection.getTable(peerTableName);
196        scan.setStartRow(value.getRow());
197
198        byte[] endRow = null;
199        if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) {
200          endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo()
201            .getEndKey();
202        } else {
203          endRow = ((TableSplit) tableSplit).getEndRow();
204        }
205
206        scan.setStopRow(endRow);
207
208        String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null);
209        if (peerSnapshotName != null) {
210          String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null);
211          String peerFSAddress = conf.get(NAME + ".peerFSAddress", null);
212          String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
213          FileSystem.setDefaultUri(peerConf, peerFSAddress);
214          CommonFSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
215          LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:"
216            + peerSnapshotTmpDir + " peer root uri:" + CommonFSUtils.getRootDir(peerConf)
217            + " peerFSAddress:" + peerFSAddress);
218
219          replicatedScanner = new TableSnapshotScanner(peerConf, CommonFSUtils.getRootDir(peerConf),
220            new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true);
221        } else {
222          replicatedScanner = replicatedTable.getScanner(scan);
223        }
224        currentCompareRowInPeerTable = replicatedScanner.next();
225      }
226      while (true) {
227        if (currentCompareRowInPeerTable == null) {
228          // reach the region end of peer table, row only in source table
229          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
230          break;
231        }
232        int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
233        if (rowCmpRet == 0) {
234          // rowkey is same, need to compare the content of the row
235          try {
236            Result.compareResults(value, currentCompareRowInPeerTable, false);
237            context.getCounter(Counters.GOODROWS).increment(1);
238            if (verbose) {
239              LOG.info(
240                "Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter);
241            }
242          } catch (Exception e) {
243            logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
244          }
245          currentCompareRowInPeerTable = replicatedScanner.next();
246          break;
247        } else if (rowCmpRet < 0) {
248          // row only exists in source table
249          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
250          break;
251        } else {
252          // row only exists in peer table
253          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
254            currentCompareRowInPeerTable);
255          currentCompareRowInPeerTable = replicatedScanner.next();
256        }
257      }
258    }
259
260    private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
261      if (sleepMsBeforeReCompare > 0) {
262        Threads.sleep(sleepMsBeforeReCompare);
263        try {
264          Result sourceResult = sourceTable.get(new Get(row.getRow()));
265          Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
266          Result.compareResults(sourceResult, replicatedResult, false);
267          if (!sourceResult.isEmpty()) {
268            context.getCounter(Counters.GOODROWS).increment(1);
269            if (verbose) {
270              LOG.info("Good row key (with recompare): " + delimiter
271                + Bytes.toStringBinary(row.getRow()) + delimiter);
272            }
273          }
274          return;
275        } catch (Exception e) {
276          LOG.error("recompare fail after sleep, rowkey=" + delimiter
277            + Bytes.toStringBinary(row.getRow()) + delimiter);
278        }
279      }
280      context.getCounter(counter).increment(1);
281      context.getCounter(Counters.BADROWS).increment(1);
282      LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow())
283        + delimiter);
284    }
285
286    @Override
287    protected void cleanup(Context context) {
288      if (replicatedScanner != null) {
289        try {
290          while (currentCompareRowInPeerTable != null) {
291            logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
292              currentCompareRowInPeerTable);
293            currentCompareRowInPeerTable = replicatedScanner.next();
294          }
295        } catch (Exception e) {
296          LOG.error("fail to scan peer table in cleanup", e);
297        } finally {
298          replicatedScanner.close();
299          replicatedScanner = null;
300        }
301      }
302
303      if (sourceTable != null) {
304        try {
305          sourceTable.close();
306        } catch (IOException e) {
307          LOG.error("fail to close source table in cleanup", e);
308        }
309      }
310      if (sourceConnection != null) {
311        try {
312          sourceConnection.close();
313        } catch (Exception e) {
314          LOG.error("fail to close source connection in cleanup", e);
315        }
316      }
317
318      if (replicatedTable != null) {
319        try {
320          replicatedTable.close();
321        } catch (Exception e) {
322          LOG.error("fail to close replicated table in cleanup", e);
323        }
324      }
325      if (replicatedConnection != null) {
326        try {
327          replicatedConnection.close();
328        } catch (Exception e) {
329          LOG.error("fail to close replicated connection in cleanup", e);
330        }
331      }
332    }
333  }
334
335  private static Pair<ReplicationPeerConfig, Configuration>
336    getPeerQuorumConfig(final Configuration conf, String peerId) throws IOException {
337    ZKWatcher localZKW = null;
338    try {
339      localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
340        @Override
341        public void abort(String why, Throwable e) {
342        }
343
344        @Override
345        public boolean isAborted() {
346          return false;
347        }
348      });
349      ReplicationPeerStorage storage =
350        ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
351      ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
352      return Pair.newPair(peerConfig,
353        ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));
354    } catch (ReplicationException e) {
355      throw new IOException("An error occurred while trying to connect to the remote peer cluster",
356        e);
357    } finally {
358      if (localZKW != null) {
359        localZKW.close();
360      }
361    }
362  }
363
364  private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress)
365    throws IOException {
366    Configuration peerConf =
367      HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
368    FileSystem.setDefaultUri(peerConf, peerFSAddress);
369    CommonFSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress));
370    FileSystem fs = FileSystem.get(peerConf);
371    RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, CommonFSUtils.getRootDir(peerConf),
372      new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName);
373  }
374
375  /**
376   * Sets up the actual job.
377   * @param conf The current configuration.
378   * @param args The command line parameters.
379   * @return The newly created job.
380   * @throws java.io.IOException When setting up the job fails.
381   */
382  public Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
383    if (!doCommandLine(args)) {
384      return null;
385    }
386    conf.set(NAME + ".tableName", tableName);
387    conf.setLong(NAME + ".startTime", startTime);
388    conf.setLong(NAME + ".endTime", endTime);
389    conf.setInt(NAME + ".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
390    conf.set(NAME + ".delimiter", delimiter);
391    conf.setInt(NAME + ".batch", batch);
392    conf.setBoolean(NAME + ".verbose", verbose);
393    conf.setBoolean(NAME + ".includeDeletedCells", includeDeletedCells);
394    if (families != null) {
395      conf.set(NAME + ".families", families);
396    }
397    if (rowPrefixes != null) {
398      conf.set(NAME + ".rowPrefixes", rowPrefixes);
399    }
400
401    String peerQuorumAddress;
402    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null;
403    if (peerId != null) {
404      peerConfigPair = getPeerQuorumConfig(conf, peerId);
405      ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
406      peerQuorumAddress = peerConfig.getClusterKey();
407      LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: "
408        + peerConfig.getConfiguration());
409      conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
410      HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
411        peerConfig.getConfiguration().entrySet());
412    } else {
413      assert this.peerQuorumAddress != null;
414      peerQuorumAddress = this.peerQuorumAddress;
415      LOG.info("Peer Quorum Address: " + peerQuorumAddress);
416      conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
417    }
418
419    if (peerTableName != null) {
420      LOG.info("Peer Table Name: " + peerTableName);
421      conf.set(NAME + ".peerTableName", peerTableName);
422    }
423
424    conf.setInt(NAME + ".versions", versions);
425    LOG.info("Number of version: " + versions);
426
427    // Set Snapshot specific parameters
428    if (peerSnapshotName != null) {
429      conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
430
431      // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to
432      // restore snapshot.
433      Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString());
434      peerSnapshotTmpDir = restoreDir.toString();
435      conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
436
437      conf.set(NAME + ".peerFSAddress", peerFSAddress);
438      conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
439
440      // This is to create HDFS delegation token for peer cluster in case of secured
441      conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress, conf.get(HConstants.HBASE_DIR));
442    }
443
444    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
445    job.setJarByClass(VerifyReplication.class);
446
447    Scan scan = new Scan();
448    scan.setTimeRange(startTime, endTime);
449    scan.setRaw(includeDeletedCells);
450    scan.setCacheBlocks(false);
451    if (batch > 0) {
452      scan.setBatch(batch);
453    }
454    if (versions >= 0) {
455      scan.setMaxVersions(versions);
456      LOG.info("Number of versions set to " + versions);
457    }
458    if (families != null) {
459      String[] fams = families.split(",");
460      for (String fam : fams) {
461        scan.addFamily(Bytes.toBytes(fam));
462      }
463    }
464
465    setRowPrefixFilter(scan, rowPrefixes);
466
467    if (sourceSnapshotName != null) {
468      Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
469      LOG.info(
470        "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
471      TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
472        null, job, true, snapshotTempPath);
473      restoreSnapshotForPeerCluster(conf, peerQuorumAddress);
474    } else {
475      TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
476    }
477
478    Configuration peerClusterConf;
479    if (peerId != null) {
480      assert peerConfigPair != null;
481      peerClusterConf = peerConfigPair.getSecond();
482    } else {
483      peerClusterConf =
484        HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
485    }
486    // Obtain the auth token from peer cluster
487    TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
488
489    job.setOutputFormatClass(NullOutputFormat.class);
490    job.setNumReduceTasks(0);
491    return job;
492  }
493
494  private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
495    if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
496      String[] rowPrefixArray = rowPrefixes.split(",");
497      Arrays.sort(rowPrefixArray);
498      FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
499      for (String prefix : rowPrefixArray) {
500        Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
501        filterList.addFilter(filter);
502      }
503      scan.setFilter(filterList);
504      byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
505      byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length - 1]);
506      setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
507    }
508  }
509
510  private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
511    scan.setStartRow(startPrefixRow);
512    byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
513      new byte[] { (byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1) });
514    scan.setStopRow(stopRow);
515  }
516
517  public boolean doCommandLine(final String[] args) {
518    if (args.length < 2) {
519      printUsage(null);
520      return false;
521    }
522    try {
523      for (int i = 0; i < args.length; i++) {
524        String cmd = args[i];
525        if (cmd.equals("-h") || cmd.startsWith("--h")) {
526          printUsage(null);
527          return false;
528        }
529
530        final String startTimeArgKey = "--starttime=";
531        if (cmd.startsWith(startTimeArgKey)) {
532          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
533          continue;
534        }
535
536        final String endTimeArgKey = "--endtime=";
537        if (cmd.startsWith(endTimeArgKey)) {
538          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
539          continue;
540        }
541
542        final String includeDeletedCellsArgKey = "--raw";
543        if (cmd.equals(includeDeletedCellsArgKey)) {
544          includeDeletedCells = true;
545          continue;
546        }
547
548        final String versionsArgKey = "--versions=";
549        if (cmd.startsWith(versionsArgKey)) {
550          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
551          continue;
552        }
553
554        final String batchArgKey = "--batch=";
555        if (cmd.startsWith(batchArgKey)) {
556          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
557          continue;
558        }
559
560        final String familiesArgKey = "--families=";
561        if (cmd.startsWith(familiesArgKey)) {
562          families = cmd.substring(familiesArgKey.length());
563          continue;
564        }
565
566        final String rowPrefixesKey = "--row-prefixes=";
567        if (cmd.startsWith(rowPrefixesKey)) {
568          rowPrefixes = cmd.substring(rowPrefixesKey.length());
569          continue;
570        }
571
572        final String delimiterArgKey = "--delimiter=";
573        if (cmd.startsWith(delimiterArgKey)) {
574          delimiter = cmd.substring(delimiterArgKey.length());
575          continue;
576        }
577
578        final String sleepToReCompareKey = "--recomparesleep=";
579        if (cmd.startsWith(sleepToReCompareKey)) {
580          sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
581          continue;
582        }
583        final String verboseKey = "--verbose";
584        if (cmd.startsWith(verboseKey)) {
585          verbose = true;
586          continue;
587        }
588
589        final String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
590        if (cmd.startsWith(sourceSnapshotNameArgKey)) {
591          sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length());
592          continue;
593        }
594
595        final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
596        if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) {
597          sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length());
598          continue;
599        }
600
601        final String peerSnapshotNameArgKey = "--peerSnapshotName=";
602        if (cmd.startsWith(peerSnapshotNameArgKey)) {
603          peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length());
604          continue;
605        }
606
607        final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
608        if (cmd.startsWith(peerSnapshotTmpDirArgKey)) {
609          peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length());
610          continue;
611        }
612
613        final String peerFSAddressArgKey = "--peerFSAddress=";
614        if (cmd.startsWith(peerFSAddressArgKey)) {
615          peerFSAddress = cmd.substring(peerFSAddressArgKey.length());
616          continue;
617        }
618
619        final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
620        if (cmd.startsWith(peerHBaseRootAddressArgKey)) {
621          peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length());
622          continue;
623        }
624
625        final String peerTableNameArgKey = "--peerTableName=";
626        if (cmd.startsWith(peerTableNameArgKey)) {
627          peerTableName = cmd.substring(peerTableNameArgKey.length());
628          continue;
629        }
630
631        if (cmd.startsWith("--")) {
632          printUsage("Invalid argument '" + cmd + "'");
633          return false;
634        }
635
636        if (i == args.length - 2) {
637          if (isPeerQuorumAddress(cmd)) {
638            peerQuorumAddress = cmd;
639          } else {
640            peerId = cmd;
641          }
642        }
643
644        if (i == args.length - 1) {
645          tableName = cmd;
646        }
647      }
648
649      if (
650        (sourceSnapshotName != null && sourceSnapshotTmpDir == null)
651          || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)
652      ) {
653        printUsage("Source snapshot name and snapshot temp location should be provided"
654          + " to use snapshots in source cluster");
655        return false;
656      }
657
658      if (
659        peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null
660          || peerHBaseRootAddress != null
661      ) {
662        if (
663          peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null
664            || peerHBaseRootAddress == null
665        ) {
666          printUsage(
667            "Peer snapshot name, peer snapshot temp location, Peer HBase root address and  "
668              + "peer FSAddress should be provided to use snapshots in peer cluster");
669          return false;
670        }
671      }
672
673      // This is to avoid making recompare calls to source/peer tables when snapshots are used
674      if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
675        printUsage(
676          "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are"
677            + " immutable");
678        return false;
679      }
680
681    } catch (Exception e) {
682      e.printStackTrace();
683      printUsage("Can't start because " + e.getMessage());
684      return false;
685    }
686    return true;
687  }
688
689  private boolean isPeerQuorumAddress(String cmd) {
690    try {
691      ZKConfig.validateClusterKey(cmd);
692    } catch (IOException e) {
693      // not a quorum address
694      return false;
695    }
696    return true;
697  }
698
699  /*
700   * @param errorMsg Error message. Can be null.
701   */
702  private static void printUsage(final String errorMsg) {
703    if (errorMsg != null && errorMsg.length() > 0) {
704      System.err.println("ERROR: " + errorMsg);
705    }
706    System.err.println("Usage: verifyrep [--starttime=X]"
707      + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] "
708      + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] "
709      + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] "
710      + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
711    System.err.println();
712    System.err.println("Options:");
713    System.err.println(" starttime    beginning of the time range");
714    System.err.println("              without endtime means from starttime to forever");
715    System.err.println(" endtime      end of the time range");
716    System.err.println(" versions     number of cell versions to verify");
717    System.err.println(" batch        batch count for scan, note that"
718      + " result row counts will no longer be actual number of rows when you use this option");
719    System.err.println(" raw          includes raw scan if given in options");
720    System.err.println(" families     comma-separated list of families to copy");
721    System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
722    System.err.println(" delimiter    the delimiter used in display around rowkey");
723    System.err.println(" recomparesleep   milliseconds to sleep before recompare row, "
724      + "default value is 0 which disables the recompare.");
725    System.err.println(" verbose      logs row keys of good rows");
726    System.err.println(" peerTableName  Peer Table Name");
727    System.err.println(" sourceSnapshotName  Source Snapshot Name");
728    System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
729    System.err.println(" peerSnapshotName  Peer Snapshot Name");
730    System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
731    System.err.println(" peerFSAddress      Peer cluster Hadoop FS address");
732    System.err.println(" peerHBaseRootAddress  Peer cluster HBase root location");
733    System.err.println();
734    System.err.println("Args:");
735    System.err.println(" peerid       Id of the peer used for verification,"
736      + " must match the one given for replication");
737    System.err.println(" peerQuorumAddress   quorumAdress of the peer used for verification. The "
738      + "format is zk_quorum:zk_port:zk_hbase_path");
739    System.err.println(" tablename    Name of the table to verify");
740    System.err.println();
741    System.err.println("Examples:");
742    System.err
743      .println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
744    System.err
745      .println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication"
746        + " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
747    System.err.println();
748    System.err.println(
749      " To verify the data in TestTable between the cluster runs VerifyReplication and cluster-b");
750    System.err.println(" Assume quorum address for cluster-b is"
751      + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b");
752    System.err
753      .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n"
754        + "     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
755        + "2181:/cluster-b \\\n" + "     TestTable");
756    System.err.println();
757    System.err
758      .println(" To verify the data in TestTable between the secured cluster runs VerifyReplication"
759        + " and insecure cluster-b");
760    System.err
761      .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n"
762        + "     -D verifyrep.peer.hbase.security.authentication=simple \\\n"
763        + "     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
764        + "2181:/cluster-b \\\n" + "     TestTable");
765    System.err.println();
766    System.err.println(" To verify the data in TestTable between"
767      + " the secured cluster runs VerifyReplication and secured cluster-b");
768    System.err.println(" Assume cluster-b uses different kerberos principal, cluster-b/_HOST@E"
769      + ", for master and regionserver kerberos principal from another cluster");
770    System.err
771      .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n"
772        + "     -D verifyrep.peer.hbase.regionserver.kerberos.principal="
773        + "cluster-b/_HOST@EXAMPLE.COM \\\n"
774        + "     -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n"
775        + "     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
776        + "2181:/cluster-b \\\n" + "     TestTable");
777    System.err.println();
778    System.err.println(
779      " To verify the data in TestTable between the insecure cluster runs VerifyReplication"
780        + " and secured cluster-b");
781    System.err
782      .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n"
783        + "     -D verifyrep.peer.hbase.security.authentication=kerberos \\\n"
784        + "     -D verifyrep.peer.hbase.regionserver.kerberos.principal="
785        + "cluster-b/_HOST@EXAMPLE.COM \\\n"
786        + "     -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n"
787        + "     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
788        + "2181:/cluster-b \\\n" + "     TestTable");
789  }
790
791  @Override
792  public int run(String[] args) throws Exception {
793    Configuration conf = this.getConf();
794    Job job = createSubmittableJob(conf, args);
795    if (job != null) {
796      return job.waitForCompletion(true) ? 0 : 1;
797    }
798    return 1;
799  }
800
801  /**
802   * Main entry point.
803   * @param args The command line parameters.
804   * @throws Exception When running the job fails.
805   */
806  public static void main(String[] args) throws Exception {
807    int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
808    System.exit(res);
809  }
810}