001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce.replication;
019
020import java.io.IOException;
021import java.net.URI;
022import java.util.Arrays;
023import java.util.List;
024import java.util.UUID;
025import java.util.concurrent.SynchronousQueue;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
028import java.util.concurrent.TimeUnit;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.conf.Configured;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.client.ConnectionRegistryFactory;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.ResultScanner;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.TableSnapshotScanner;
046import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
047import org.apache.hadoop.hbase.filter.Filter;
048import org.apache.hadoop.hbase.filter.FilterList;
049import org.apache.hadoop.hbase.filter.PrefixFilter;
050import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
051import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
053import org.apache.hadoop.hbase.mapreduce.TableMapper;
054import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
055import org.apache.hadoop.hbase.mapreduce.TableSplit;
056import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters;
057import org.apache.hadoop.hbase.replication.ReplicationException;
058import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
059import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
060import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
061import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.CommonFSUtils;
064import org.apache.hadoop.hbase.util.Pair;
065import org.apache.hadoop.hbase.util.Strings;
066import org.apache.hadoop.hbase.zookeeper.ZKConfig;
067import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
068import org.apache.hadoop.mapreduce.InputSplit;
069import org.apache.hadoop.mapreduce.Job;
070import org.apache.hadoop.mapreduce.MRJobConfig;
071import org.apache.hadoop.mapreduce.Mapper;
072import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
073import org.apache.hadoop.util.Tool;
074import org.apache.hadoop.util.ToolRunner;
075import org.apache.yetus.audience.InterfaceAudience;
076import org.slf4j.Logger;
077import org.slf4j.LoggerFactory;
078
079/**
080 * This map-only job compares the data from a local table with a remote one. Every cell is compared
081 * and must have exactly the same keys (even timestamp) as well as same value. It is possible to
082 * restrict the job by time range and families. The peer id that's provided must match the one given
083 * when the replication stream was setup.
084 * <p>
085 * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason for a why a row is
086 * different is shown in the map's log.
087 */
088@InterfaceAudience.Private
089public class VerifyReplication extends Configured implements Tool {
090
091  private static final Logger LOG = LoggerFactory.getLogger(VerifyReplication.class);
092
093  public final static String NAME = "verifyrep";
094  private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
095  private static ThreadPoolExecutor reCompareExecutor = null;
096  int reCompareTries = 0;
097  int reCompareBackoffExponent = 0;
098  int reCompareThreads = 0;
099  int sleepMsBeforeReCompare = 0;
100  long startTime = 0;
101  long endTime = Long.MAX_VALUE;
102  int batch = -1;
103  int versions = -1;
104  String tableName = null;
105  String families = null;
106  String delimiter = "";
107  String peerId = null;
108  String peerQuorumAddress = null;
109  String rowPrefixes = null;
110  boolean verbose = false;
111  boolean includeDeletedCells = false;
112  // Source table snapshot name
113  String sourceSnapshotName = null;
114  // Temp location in source cluster to restore source snapshot
115  String sourceSnapshotTmpDir = null;
116  // Peer table snapshot name
117  String peerSnapshotName = null;
118  // Temp location in peer cluster to restore peer snapshot
119  String peerSnapshotTmpDir = null;
120  // Peer cluster Hadoop FS address
121  String peerFSAddress = null;
122  // Peer cluster HBase root dir location
123  String peerHBaseRootAddress = null;
124  // Peer Table Name
125  String peerTableName = null;
126
127  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
128
129  /**
130   * Map-only comparator for 2 tables
131   */
132  public static class Verifier extends TableMapper<ImmutableBytesWritable, Put> {
133
134    public enum Counters {
135      GOODROWS,
136      BADROWS,
137      ONLY_IN_SOURCE_TABLE_ROWS,
138      ONLY_IN_PEER_TABLE_ROWS,
139      CONTENT_DIFFERENT_ROWS,
140      RECOMPARES,
141      MAIN_THREAD_RECOMPARES,
142      SOURCE_ROW_CHANGED,
143      PEER_ROW_CHANGED,
144      FAILED_RECOMPARE
145    }
146
147    private Connection sourceConnection;
148    private Table sourceTable;
149    private Connection replicatedConnection;
150    private Table replicatedTable;
151    private ResultScanner replicatedScanner;
152    private Result currentCompareRowInPeerTable;
153    private Scan tableScan;
154    private int reCompareTries;
155    private int reCompareBackoffExponent;
156    private int sleepMsBeforeReCompare;
157    private String delimiter = "";
158    private boolean verbose = false;
159    private int batch = -1;
160
161    /**
162     * Map method that compares every scanned row with the equivalent from a distant cluster.
163     * @param row     The current table row key.
164     * @param value   The columns.
165     * @param context The current context.
166     * @throws IOException When something is broken with the data.
167     */
168    @Override
169    public void map(ImmutableBytesWritable row, final Result value, Context context)
170      throws IOException {
171      if (replicatedScanner == null) {
172        Configuration conf = context.getConfiguration();
173        reCompareTries = conf.getInt(NAME + ".recompareTries", 0);
174        reCompareBackoffExponent = conf.getInt(NAME + ".recompareBackoffExponent", 1);
175        sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0);
176        if (sleepMsBeforeReCompare > 0) {
177          reCompareTries = Math.max(reCompareTries, 1);
178        }
179        delimiter = conf.get(NAME + ".delimiter", "");
180        verbose = conf.getBoolean(NAME + ".verbose", false);
181        batch = conf.getInt(NAME + ".batch", -1);
182        final Scan scan = new Scan();
183        if (batch > 0) {
184          scan.setBatch(batch);
185        }
186        scan.setCacheBlocks(false);
187        scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
188        long startTime = conf.getLong(NAME + ".startTime", 0);
189        long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
190        String families = conf.get(NAME + ".families", null);
191        if (families != null) {
192          String[] fams = families.split(",");
193          for (String fam : fams) {
194            scan.addFamily(Bytes.toBytes(fam));
195          }
196        }
197        boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false);
198        scan.setRaw(includeDeletedCells);
199        String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
200        setRowPrefixFilter(scan, rowPrefixes);
201        scan.setTimeRange(startTime, endTime);
202        int versions = conf.getInt(NAME + ".versions", -1);
203        LOG.info("Setting number of version inside map as: " + versions);
204        if (versions >= 0) {
205          scan.readVersions(versions);
206        }
207        int reCompareThreads = conf.getInt(NAME + ".recompareThreads", 0);
208        reCompareExecutor = buildReCompareExecutor(reCompareThreads, context);
209        TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
210        sourceConnection = ConnectionFactory.createConnection(conf);
211        sourceTable = sourceConnection.getTable(tableName);
212        tableScan = scan;
213
214        final InputSplit tableSplit = context.getInputSplit();
215
216        String peerQuorumAddress = conf.get(NAME + ".peerQuorumAddress");
217        URI connectionUri = ConnectionRegistryFactory.tryParseAsConnectionURI(peerQuorumAddress);
218        Configuration peerConf;
219        if (connectionUri != null) {
220          peerConf = HBaseConfiguration.create(conf);
221        } else {
222          peerConf =
223            HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
224        }
225        String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString());
226        TableName peerTableName = TableName.valueOf(peerName);
227        replicatedConnection = ConnectionFactory.createConnection(connectionUri, peerConf);
228        replicatedTable = replicatedConnection.getTable(peerTableName);
229        scan.withStartRow(value.getRow());
230
231        byte[] endRow = null;
232        if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) {
233          endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegion()
234            .getEndKey();
235        } else {
236          endRow = ((TableSplit) tableSplit).getEndRow();
237        }
238
239        scan.withStopRow(endRow);
240
241        String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null);
242        if (peerSnapshotName != null) {
243          String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null);
244          String peerFSAddress = conf.get(NAME + ".peerFSAddress", null);
245          String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
246          FileSystem.setDefaultUri(peerConf, peerFSAddress);
247          CommonFSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
248          LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:"
249            + peerSnapshotTmpDir + " peer root uri:" + CommonFSUtils.getRootDir(peerConf)
250            + " peerFSAddress:" + peerFSAddress);
251
252          replicatedScanner = new TableSnapshotScanner(peerConf, CommonFSUtils.getRootDir(peerConf),
253            new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true);
254        } else {
255          replicatedScanner = replicatedTable.getScanner(scan);
256        }
257        currentCompareRowInPeerTable = replicatedScanner.next();
258      }
259      while (true) {
260        if (currentCompareRowInPeerTable == null) {
261          // reach the region end of peer table, row only in source table
262          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
263          break;
264        }
265        int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
266        if (rowCmpRet == 0) {
267          // rowkey is same, need to compare the content of the row
268          try {
269            Result.compareResults(value, currentCompareRowInPeerTable, false);
270            context.getCounter(Counters.GOODROWS).increment(1);
271            if (verbose) {
272              LOG.info(
273                "Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter);
274            }
275          } catch (Exception e) {
276            logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value,
277              currentCompareRowInPeerTable);
278          }
279          currentCompareRowInPeerTable = replicatedScanner.next();
280          break;
281        } else if (rowCmpRet < 0) {
282          // row only exists in source table
283          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
284          break;
285        } else {
286          // row only exists in peer table
287          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null,
288            currentCompareRowInPeerTable);
289          currentCompareRowInPeerTable = replicatedScanner.next();
290        }
291      }
292    }
293
294    @SuppressWarnings("FutureReturnValueIgnored")
295    private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row,
296      Result replicatedRow) {
297      byte[] rowKey = getRow(row, replicatedRow);
298      if (reCompareTries == 0) {
299        context.getCounter(counter).increment(1);
300        context.getCounter(Counters.BADROWS).increment(1);
301        LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(rowKey), delimiter);
302        return;
303      }
304
305      VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
306        row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable,
307        reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose);
308
309      if (reCompareExecutor == null) {
310        runnable.run();
311        return;
312      }
313
314      reCompareExecutor.submit(runnable);
315    }
316
317    @Override
318    protected void cleanup(Context context) {
319      if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) {
320        reCompareExecutor.shutdown();
321        try {
322          boolean terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES);
323          if (!terminated) {
324            List<Runnable> queue = reCompareExecutor.shutdownNow();
325            for (Runnable runnable : queue) {
326              ((VerifyReplicationRecompareRunnable) runnable).fail();
327            }
328
329            terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES);
330
331            if (!terminated) {
332              int activeCount = Math.max(1, reCompareExecutor.getActiveCount());
333              LOG.warn("Found {} possible recompares still running in the executable"
334                + " incrementing BADROWS and FAILED_RECOMPARE", activeCount);
335              context.getCounter(Counters.BADROWS).increment(activeCount);
336              context.getCounter(Counters.FAILED_RECOMPARE).increment(activeCount);
337            }
338          }
339        } catch (InterruptedException e) {
340          throw new RuntimeException("Failed to await executor termination in cleanup", e);
341        }
342      }
343      if (replicatedScanner != null) {
344        try {
345          while (currentCompareRowInPeerTable != null) {
346            logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null,
347              currentCompareRowInPeerTable);
348            currentCompareRowInPeerTable = replicatedScanner.next();
349          }
350        } catch (Exception e) {
351          LOG.error("fail to scan peer table in cleanup", e);
352        } finally {
353          replicatedScanner.close();
354          replicatedScanner = null;
355        }
356      }
357
358      if (sourceTable != null) {
359        try {
360          sourceTable.close();
361        } catch (IOException e) {
362          LOG.error("fail to close source table in cleanup", e);
363        }
364      }
365      if (sourceConnection != null) {
366        try {
367          sourceConnection.close();
368        } catch (Exception e) {
369          LOG.error("fail to close source connection in cleanup", e);
370        }
371      }
372
373      if (replicatedTable != null) {
374        try {
375          replicatedTable.close();
376        } catch (Exception e) {
377          LOG.error("fail to close replicated table in cleanup", e);
378        }
379      }
380      if (replicatedConnection != null) {
381        try {
382          replicatedConnection.close();
383        } catch (Exception e) {
384          LOG.error("fail to close replicated connection in cleanup", e);
385        }
386      }
387    }
388  }
389
390  private static Pair<ReplicationPeerConfig, Configuration>
391    getPeerQuorumConfig(final Configuration conf, String peerId) throws IOException {
392    ZKWatcher localZKW = null;
393    try {
394      localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
395        @Override
396        public void abort(String why, Throwable e) {
397        }
398
399        @Override
400        public boolean isAborted() {
401          return false;
402        }
403      });
404      ReplicationPeerStorage storage =
405        ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), localZKW, conf);
406      ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
407      return Pair.newPair(peerConfig,
408        ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peerConfig));
409    } catch (ReplicationException e) {
410      throw new IOException("An error occurred while trying to connect to the remote peer cluster",
411        e);
412    } finally {
413      if (localZKW != null) {
414        localZKW.close();
415      }
416    }
417  }
418
419  private Configuration applyURIConf(Configuration conf, URI uri) {
420    Configuration peerConf = HBaseConfiguration.subset(conf, PEER_CONFIG_PREFIX);
421    HBaseConfiguration.merge(peerConf, conf);
422    Strings.applyURIQueriesToConf(uri, peerConf);
423    return peerConf;
424  }
425
426  private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress)
427    throws IOException {
428    URI uri = ConnectionRegistryFactory.tryParseAsConnectionURI(peerQuorumAddress);
429    Configuration peerConf;
430    if (uri != null) {
431      peerConf = applyURIConf(conf, uri);
432    } else {
433      peerConf = HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
434    }
435    FileSystem.setDefaultUri(peerConf, peerFSAddress);
436    CommonFSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress));
437    FileSystem fs = FileSystem.get(peerConf);
438    RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, CommonFSUtils.getRootDir(peerConf),
439      new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName);
440  }
441
442  /**
443   * Sets up the actual job.
444   * @param conf The current configuration.
445   * @param args The command line parameters.
446   * @return The newly created job.
447   * @throws java.io.IOException When setting up the job fails.
448   */
449  public Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
450    if (!doCommandLine(args)) {
451      return null;
452    }
453    conf.set(NAME + ".tableName", tableName);
454    conf.setLong(NAME + ".startTime", startTime);
455    conf.setLong(NAME + ".endTime", endTime);
456    conf.setInt(NAME + ".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
457    conf.set(NAME + ".delimiter", delimiter);
458    conf.setInt(NAME + ".batch", batch);
459    conf.setBoolean(NAME + ".verbose", verbose);
460    conf.setBoolean(NAME + ".includeDeletedCells", includeDeletedCells);
461    if (families != null) {
462      conf.set(NAME + ".families", families);
463    }
464    if (rowPrefixes != null) {
465      conf.set(NAME + ".rowPrefixes", rowPrefixes);
466    }
467
468    String peerQuorumAddress;
469    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null;
470    if (peerId != null) {
471      peerConfigPair = getPeerQuorumConfig(conf, peerId);
472      ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
473      peerQuorumAddress = peerConfig.getClusterKey();
474      LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: "
475        + peerConfig.getConfiguration());
476      conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
477      HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
478        peerConfig.getConfiguration().entrySet());
479    } else {
480      assert this.peerQuorumAddress != null;
481      peerQuorumAddress = this.peerQuorumAddress;
482      LOG.info("Peer Quorum Address: " + peerQuorumAddress);
483      conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
484    }
485
486    if (peerTableName != null) {
487      LOG.info("Peer Table Name: " + peerTableName);
488      conf.set(NAME + ".peerTableName", peerTableName);
489    }
490
491    conf.setInt(NAME + ".versions", versions);
492    LOG.info("Number of version: " + versions);
493
494    conf.setInt(NAME + ".recompareTries", reCompareTries);
495    conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent);
496    conf.setInt(NAME + ".recompareThreads", reCompareThreads);
497
498    // Set Snapshot specific parameters
499    if (peerSnapshotName != null) {
500      conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
501
502      // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to
503      // restore snapshot.
504      Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString());
505      peerSnapshotTmpDir = restoreDir.toString();
506      conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
507
508      conf.set(NAME + ".peerFSAddress", peerFSAddress);
509      conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
510
511      // This is to create HDFS delegation token for peer cluster in case of secured
512      conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress, conf.get(HConstants.HBASE_DIR));
513    }
514
515    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
516    job.setJarByClass(VerifyReplication.class);
517
518    Scan scan = new Scan();
519    scan.setTimeRange(startTime, endTime);
520    scan.setRaw(includeDeletedCells);
521    scan.setCacheBlocks(false);
522    if (batch > 0) {
523      scan.setBatch(batch);
524    }
525    if (versions >= 0) {
526      scan.readVersions(versions);
527      LOG.info("Number of versions set to " + versions);
528    }
529    if (families != null) {
530      String[] fams = families.split(",");
531      for (String fam : fams) {
532        scan.addFamily(Bytes.toBytes(fam));
533      }
534    }
535
536    setRowPrefixFilter(scan, rowPrefixes);
537
538    if (sourceSnapshotName != null) {
539      Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
540      LOG.info(
541        "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
542      TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
543        null, job, true, snapshotTempPath);
544      restoreSnapshotForPeerCluster(conf, peerQuorumAddress);
545    } else {
546      TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
547    }
548
549    Configuration peerClusterBaseConf;
550    if (peerId != null) {
551      assert peerConfigPair != null;
552      peerClusterBaseConf = peerConfigPair.getSecond();
553    } else {
554      peerClusterBaseConf = conf;
555    }
556    Configuration peerClusterConf;
557    URI uri = ConnectionRegistryFactory.tryParseAsConnectionURI(peerQuorumAddress);
558    if (uri != null) {
559      peerClusterConf = new Configuration(peerClusterBaseConf);
560      applyURIConf(peerClusterConf, uri);
561    } else {
562      peerClusterConf = HBaseConfiguration.createClusterConf(peerClusterBaseConf, peerQuorumAddress,
563        PEER_CONFIG_PREFIX);
564    }
565    // Obtain the auth token from peer cluster
566    TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf, uri);
567
568    job.setOutputFormatClass(NullOutputFormat.class);
569    job.setNumReduceTasks(0);
570    return job;
571  }
572
573  protected static byte[] getRow(Result sourceResult, Result replicatedResult) {
574    if (sourceResult != null) {
575      return sourceResult.getRow();
576    } else if (replicatedResult != null) {
577      return replicatedResult.getRow();
578    }
579    throw new RuntimeException("Both sourceResult and replicatedResult are null!");
580  }
581
582  private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
583    if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
584      String[] rowPrefixArray = rowPrefixes.split(",");
585      Arrays.sort(rowPrefixArray);
586      FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
587      for (String prefix : rowPrefixArray) {
588        Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
589        filterList.addFilter(filter);
590      }
591      scan.setFilter(filterList);
592      byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
593      byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length - 1]);
594      setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
595    }
596  }
597
598  private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
599    scan.withStartRow(startPrefixRow);
600    byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
601      new byte[] { (byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1) });
602    scan.withStopRow(stopRow);
603  }
604
605  public boolean doCommandLine(final String[] args) {
606    if (args.length < 2) {
607      printUsage(null);
608      return false;
609    }
610    try {
611      for (int i = 0; i < args.length; i++) {
612        String cmd = args[i];
613        if (cmd.equals("-h") || cmd.startsWith("--h")) {
614          printUsage(null);
615          return false;
616        }
617
618        final String startTimeArgKey = "--starttime=";
619        if (cmd.startsWith(startTimeArgKey)) {
620          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
621          continue;
622        }
623
624        final String endTimeArgKey = "--endtime=";
625        if (cmd.startsWith(endTimeArgKey)) {
626          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
627          continue;
628        }
629
630        final String includeDeletedCellsArgKey = "--raw";
631        if (cmd.equals(includeDeletedCellsArgKey)) {
632          includeDeletedCells = true;
633          continue;
634        }
635
636        final String versionsArgKey = "--versions=";
637        if (cmd.startsWith(versionsArgKey)) {
638          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
639          continue;
640        }
641
642        final String batchArgKey = "--batch=";
643        if (cmd.startsWith(batchArgKey)) {
644          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
645          continue;
646        }
647
648        final String familiesArgKey = "--families=";
649        if (cmd.startsWith(familiesArgKey)) {
650          families = cmd.substring(familiesArgKey.length());
651          continue;
652        }
653
654        final String rowPrefixesKey = "--row-prefixes=";
655        if (cmd.startsWith(rowPrefixesKey)) {
656          rowPrefixes = cmd.substring(rowPrefixesKey.length());
657          continue;
658        }
659
660        final String delimiterArgKey = "--delimiter=";
661        if (cmd.startsWith(delimiterArgKey)) {
662          delimiter = cmd.substring(delimiterArgKey.length());
663          continue;
664        }
665
666        final String deprecatedSleepToReCompareKey = "--recomparesleep=";
667        final String sleepToReCompareKey = "--recompareSleep=";
668        if (cmd.startsWith(deprecatedSleepToReCompareKey)) {
669          LOG.warn("--recomparesleep is deprecated and will be removed in 4.0.0."
670            + " Use --recompareSleep instead.");
671          sleepMsBeforeReCompare =
672            Integer.parseInt(cmd.substring(deprecatedSleepToReCompareKey.length()));
673          continue;
674        }
675        if (cmd.startsWith(sleepToReCompareKey)) {
676          sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
677          continue;
678        }
679
680        final String verboseKey = "--verbose";
681        if (cmd.startsWith(verboseKey)) {
682          verbose = true;
683          continue;
684        }
685
686        final String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
687        if (cmd.startsWith(sourceSnapshotNameArgKey)) {
688          sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length());
689          continue;
690        }
691
692        final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
693        if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) {
694          sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length());
695          continue;
696        }
697
698        final String peerSnapshotNameArgKey = "--peerSnapshotName=";
699        if (cmd.startsWith(peerSnapshotNameArgKey)) {
700          peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length());
701          continue;
702        }
703
704        final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
705        if (cmd.startsWith(peerSnapshotTmpDirArgKey)) {
706          peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length());
707          continue;
708        }
709
710        final String peerFSAddressArgKey = "--peerFSAddress=";
711        if (cmd.startsWith(peerFSAddressArgKey)) {
712          peerFSAddress = cmd.substring(peerFSAddressArgKey.length());
713          continue;
714        }
715
716        final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
717        if (cmd.startsWith(peerHBaseRootAddressArgKey)) {
718          peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length());
719          continue;
720        }
721
722        final String peerTableNameArgKey = "--peerTableName=";
723        if (cmd.startsWith(peerTableNameArgKey)) {
724          peerTableName = cmd.substring(peerTableNameArgKey.length());
725          continue;
726        }
727
728        final String reCompareThreadArgs = "--recompareThreads=";
729        if (cmd.startsWith(reCompareThreadArgs)) {
730          reCompareThreads = Integer.parseInt(cmd.substring(reCompareThreadArgs.length()));
731          continue;
732        }
733
734        final String reCompareTriesKey = "--recompareTries=";
735        if (cmd.startsWith(reCompareTriesKey)) {
736          reCompareTries = Integer.parseInt(cmd.substring(reCompareTriesKey.length()));
737          continue;
738        }
739
740        final String reCompareBackoffExponentKey = "--recompareBackoffExponent=";
741        if (cmd.startsWith(reCompareBackoffExponentKey)) {
742          reCompareBackoffExponent =
743            Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length()));
744          continue;
745        }
746
747        if (cmd.startsWith("--")) {
748          printUsage("Invalid argument '" + cmd + "'");
749          return false;
750        }
751
752        if (i == args.length - 2) {
753          if (isPeerQuorumAddress(cmd)) {
754            peerQuorumAddress = cmd;
755          } else {
756            peerId = cmd;
757          }
758        }
759
760        if (i == args.length - 1) {
761          tableName = cmd;
762        }
763      }
764
765      if (
766        (sourceSnapshotName != null && sourceSnapshotTmpDir == null)
767          || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)
768      ) {
769        printUsage("Source snapshot name and snapshot temp location should be provided"
770          + " to use snapshots in source cluster");
771        return false;
772      }
773
774      if (
775        peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null
776          || peerHBaseRootAddress != null
777      ) {
778        if (
779          peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null
780            || peerHBaseRootAddress == null
781        ) {
782          printUsage(
783            "Peer snapshot name, peer snapshot temp location, Peer HBase root address and  "
784              + "peer FSAddress should be provided to use snapshots in peer cluster");
785          return false;
786        }
787      }
788
789      // This is to avoid making recompare calls to source/peer tables when snapshots are used
790      if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
791        printUsage(
792          "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are"
793            + " immutable");
794        return false;
795      }
796
797    } catch (Exception e) {
798      LOG.error("Failed to parse commandLine arguments", e);
799      printUsage("Can't start because " + e.getMessage());
800      return false;
801    }
802    return true;
803  }
804
805  private boolean isPeerQuorumAddress(String cmd) {
806    if (ConnectionRegistryFactory.tryParseAsConnectionURI(cmd) != null) {
807      return true;
808    }
809    try {
810      ZKConfig.validateClusterKey(cmd);
811    } catch (IOException e) {
812      // not a quorum address
813      return false;
814    }
815    return true;
816  }
817
818  /*
819   * @param errorMsg Error message. Can be null.
820   */
821  private static void printUsage(final String errorMsg) {
822    if (errorMsg != null && errorMsg.length() > 0) {
823      System.err.println("ERROR: " + errorMsg);
824    }
825    System.err.println("Usage: verifyrep [--starttime=X]"
826      + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recompareSleep=] "
827      + "[--recompareThreads=] [--recompareTries=] [--recompareBackoffExponent=]"
828      + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] "
829      + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] "
830      + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
831    System.err.println();
832    System.err.println("Options:");
833    System.err.println(" starttime    beginning of the time range");
834    System.err.println("              without endtime means from starttime to forever");
835    System.err.println(" endtime      end of the time range");
836    System.err.println(" versions     number of cell versions to verify");
837    System.err.println(" batch        batch count for scan, note that"
838      + " result row counts will no longer be actual number of rows when you use this option");
839    System.err.println(" raw          includes raw scan if given in options");
840    System.err.println(" families     comma-separated list of families to copy");
841    System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
842    System.err.println(" delimiter    the delimiter used in display around rowkey");
843    System.err.println(" recompareSleep   milliseconds to sleep before recompare row, "
844      + "default value is 0 which disables the recompare.");
845    System.err.println(" recompareThreads number of threads to run recompares in");
846    System.err.println(" recompareTries   number of recompare attempts before incrementing "
847      + "the BADROWS counter. Defaults to 1 recompare");
848    System.out.println(" recompareBackoffExponent exponential multiplier to increase "
849      + "recompareSleep after each recompare attempt, "
850      + "default value is 0 which results in a constant sleep time");
851    System.err.println(" verbose      logs row keys of good rows");
852    System.err.println(" peerTableName  Peer Table Name");
853    System.err.println(" sourceSnapshotName  Source Snapshot Name");
854    System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
855    System.err.println(" peerSnapshotName  Peer Snapshot Name");
856    System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
857    System.err.println(" peerFSAddress      Peer cluster Hadoop FS address");
858    System.err.println(" peerHBaseRootAddress  Peer cluster HBase root location");
859    System.err.println();
860    System.err.println("Args:");
861    System.err.println(" peerid       Id of the peer used for verification,"
862      + " must match the one given for replication");
863    System.err.println(" peerQuorumAddress   quorumAdress of the peer used for verification. The "
864      + "format is zk_quorum:zk_port:zk_hbase_path");
865    System.err.println(" tablename    Name of the table to verify");
866    System.err.println();
867    System.err.println("Examples:");
868    System.err
869      .println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
870    System.err
871      .println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication"
872        + " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
873    System.err.println();
874    System.err.println(
875      " To verify the data in TestTable between the cluster runs VerifyReplication and cluster-b");
876    System.err.println(" Assume quorum address for cluster-b is"
877      + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b");
878    System.err
879      .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n"
880        + "     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
881        + "2181:/cluster-b \\\n" + "     TestTable");
882    System.err.println();
883    System.err
884      .println(" To verify the data in TestTable between the secured cluster runs VerifyReplication"
885        + " and insecure cluster-b");
886    System.err
887      .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n"
888        + "     -D verifyrep.peer.hbase.security.authentication=simple \\\n"
889        + "     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
890        + "2181:/cluster-b \\\n" + "     TestTable");
891    System.err.println();
892    System.err.println(" To verify the data in TestTable between"
893      + " the secured cluster runs VerifyReplication and secured cluster-b");
894    System.err.println(" Assume cluster-b uses different kerberos principal, cluster-b/_HOST@E"
895      + ", for master and regionserver kerberos principal from another cluster");
896    System.err
897      .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n"
898        + "     -D verifyrep.peer.hbase.regionserver.kerberos.principal="
899        + "cluster-b/_HOST@EXAMPLE.COM \\\n"
900        + "     -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n"
901        + "     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
902        + "2181:/cluster-b \\\n" + "     TestTable");
903    System.err.println();
904    System.err.println(
905      " To verify the data in TestTable between the insecure cluster runs VerifyReplication"
906        + " and secured cluster-b");
907    System.err
908      .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n"
909        + "     -D verifyrep.peer.hbase.security.authentication=kerberos \\\n"
910        + "     -D verifyrep.peer.hbase.regionserver.kerberos.principal="
911        + "cluster-b/_HOST@EXAMPLE.COM \\\n"
912        + "     -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n"
913        + "     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
914        + "2181:/cluster-b \\\n" + "     TestTable");
915  }
916
917  private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.Context context) {
918    if (maxThreads == 0) {
919      return null;
920    }
921
922    return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(),
923      buildRejectedReComparePolicy(context));
924  }
925
926  private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) {
927    return new CallerRunsPolicy() {
928      @Override
929      public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
930        LOG.debug("Re-comparison execution rejected. Running in main thread.");
931        context.getCounter(Counters.MAIN_THREAD_RECOMPARES).increment(1);
932        // will run in the current thread
933        super.rejectedExecution(runnable, e);
934      }
935    };
936  }
937
938  @Override
939  public int run(String[] args) throws Exception {
940    Configuration conf = this.getConf();
941    Job job = createSubmittableJob(conf, args);
942    if (job != null) {
943      return job.waitForCompletion(true) ? 0 : 1;
944    }
945    return 1;
946  }
947
948  /**
949   * Main entry point.
950   * @param args The command line parameters.
951   * @throws Exception When running the job fails.
952   */
953  public static void main(String[] args) throws Exception {
954    int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
955    System.exit(res);
956  }
957}