001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce.replication;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.SynchronousQueue;
025import java.util.concurrent.ThreadPoolExecutor;
026import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
027import java.util.concurrent.TimeUnit;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.conf.Configured;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.client.ConnectionFactory;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.ResultScanner;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.client.TableSnapshotScanner;
044import org.apache.hadoop.hbase.filter.Filter;
045import org.apache.hadoop.hbase.filter.FilterList;
046import org.apache.hadoop.hbase.filter.PrefixFilter;
047import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
048import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
049import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
050import org.apache.hadoop.hbase.mapreduce.TableMapper;
051import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
052import org.apache.hadoop.hbase.mapreduce.TableSplit;
053import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters;
054import org.apache.hadoop.hbase.replication.ReplicationException;
055import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
056import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
057import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
058import org.apache.hadoop.hbase.replication.ReplicationUtils;
059import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.CommonFSUtils;
062import org.apache.hadoop.hbase.util.Pair;
063import org.apache.hadoop.hbase.zookeeper.ZKConfig;
064import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
065import org.apache.hadoop.mapreduce.InputSplit;
066import org.apache.hadoop.mapreduce.Job;
067import org.apache.hadoop.mapreduce.MRJobConfig;
068import org.apache.hadoop.mapreduce.Mapper;
069import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
070import org.apache.hadoop.util.Tool;
071import org.apache.hadoop.util.ToolRunner;
072import org.apache.yetus.audience.InterfaceAudience;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076/**
077 * This map-only job compares the data from a local table with a remote one. Every cell is compared
078 * and must have exactly the same keys (even timestamp) as well as same value. It is possible to
079 * restrict the job by time range and families. The peer id that's provided must match the one given
080 * when the replication stream was setup.
081 * <p>
082 * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason for a why a row is
083 * different is shown in the map's log.
084 */
085@InterfaceAudience.Private
086public class VerifyReplication extends Configured implements Tool {
087
088  private static final Logger LOG = LoggerFactory.getLogger(VerifyReplication.class);
089
090  public final static String NAME = "verifyrep";
091  private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
092  private static ThreadPoolExecutor reCompareExecutor = null;
093  int reCompareTries = 0;
094  int reCompareBackoffExponent = 0;
095  int reCompareThreads = 0;
096  int sleepMsBeforeReCompare = 0;
097  long startTime = 0;
098  long endTime = Long.MAX_VALUE;
099  int batch = -1;
100  int versions = -1;
101  String tableName = null;
102  String families = null;
103  String delimiter = "";
104  String peerId = null;
105  String peerQuorumAddress = null;
106  String rowPrefixes = null;
107  boolean verbose = false;
108  boolean includeDeletedCells = false;
109  // Source table snapshot name
110  String sourceSnapshotName = null;
111  // Temp location in source cluster to restore source snapshot
112  String sourceSnapshotTmpDir = null;
113  // Peer table snapshot name
114  String peerSnapshotName = null;
115  // Temp location in peer cluster to restore peer snapshot
116  String peerSnapshotTmpDir = null;
117  // Peer cluster Hadoop FS address
118  String peerFSAddress = null;
119  // Peer cluster HBase root dir location
120  String peerHBaseRootAddress = null;
121  // Peer Table Name
122  String peerTableName = null;
123
124  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
125
126  /**
127   * Map-only comparator for 2 tables
128   */
129  public static class Verifier extends TableMapper<ImmutableBytesWritable, Put> {
130
131    public enum Counters {
132      GOODROWS,
133      BADROWS,
134      ONLY_IN_SOURCE_TABLE_ROWS,
135      ONLY_IN_PEER_TABLE_ROWS,
136      CONTENT_DIFFERENT_ROWS,
137      RECOMPARES,
138      MAIN_THREAD_RECOMPARES,
139      SOURCE_ROW_CHANGED,
140      PEER_ROW_CHANGED,
141      FAILED_RECOMPARE
142    }
143
144    private Connection sourceConnection;
145    private Table sourceTable;
146    private Connection replicatedConnection;
147    private Table replicatedTable;
148    private ResultScanner replicatedScanner;
149    private Result currentCompareRowInPeerTable;
150    private Scan tableScan;
151    private int reCompareTries;
152    private int reCompareBackoffExponent;
153    private int sleepMsBeforeReCompare;
154    private String delimiter = "";
155    private boolean verbose = false;
156    private int batch = -1;
157
158    /**
159     * Map method that compares every scanned row with the equivalent from a distant cluster.
160     * @param row     The current table row key.
161     * @param value   The columns.
162     * @param context The current context.
163     * @throws IOException When something is broken with the data.
164     */
165    @Override
166    public void map(ImmutableBytesWritable row, final Result value, Context context)
167      throws IOException {
168      if (replicatedScanner == null) {
169        Configuration conf = context.getConfiguration();
170        reCompareTries = conf.getInt(NAME + ".recompareTries", 0);
171        reCompareBackoffExponent = conf.getInt(NAME + ".recompareBackoffExponent", 1);
172        sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0);
173        if (sleepMsBeforeReCompare > 0) {
174          reCompareTries = Math.max(reCompareTries, 1);
175        }
176        delimiter = conf.get(NAME + ".delimiter", "");
177        verbose = conf.getBoolean(NAME + ".verbose", false);
178        batch = conf.getInt(NAME + ".batch", -1);
179        final Scan scan = new Scan();
180        if (batch > 0) {
181          scan.setBatch(batch);
182        }
183        scan.setCacheBlocks(false);
184        scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
185        long startTime = conf.getLong(NAME + ".startTime", 0);
186        long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
187        String families = conf.get(NAME + ".families", null);
188        if (families != null) {
189          String[] fams = families.split(",");
190          for (String fam : fams) {
191            scan.addFamily(Bytes.toBytes(fam));
192          }
193        }
194        boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false);
195        scan.setRaw(includeDeletedCells);
196        String rowPrefixes = conf.get(NAME + ".rowPrefixes", null);
197        setRowPrefixFilter(scan, rowPrefixes);
198        scan.setTimeRange(startTime, endTime);
199        int versions = conf.getInt(NAME + ".versions", -1);
200        LOG.info("Setting number of version inside map as: " + versions);
201        if (versions >= 0) {
202          scan.readVersions(versions);
203        }
204        int reCompareThreads = conf.getInt(NAME + ".recompareThreads", 0);
205        reCompareExecutor = buildReCompareExecutor(reCompareThreads, context);
206        TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
207        sourceConnection = ConnectionFactory.createConnection(conf);
208        sourceTable = sourceConnection.getTable(tableName);
209        tableScan = scan;
210
211        final InputSplit tableSplit = context.getInputSplit();
212
213        String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
214        Configuration peerConf =
215          HBaseConfiguration.createClusterConf(conf, zkClusterKey, PEER_CONFIG_PREFIX);
216
217        String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString());
218        TableName peerTableName = TableName.valueOf(peerName);
219        replicatedConnection = ConnectionFactory.createConnection(peerConf);
220        replicatedTable = replicatedConnection.getTable(peerTableName);
221        scan.withStartRow(value.getRow());
222
223        byte[] endRow = null;
224        if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) {
225          endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegion()
226            .getEndKey();
227        } else {
228          endRow = ((TableSplit) tableSplit).getEndRow();
229        }
230
231        scan.withStopRow(endRow);
232
233        String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null);
234        if (peerSnapshotName != null) {
235          String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null);
236          String peerFSAddress = conf.get(NAME + ".peerFSAddress", null);
237          String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
238          FileSystem.setDefaultUri(peerConf, peerFSAddress);
239          CommonFSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
240          LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:"
241            + peerSnapshotTmpDir + " peer root uri:" + CommonFSUtils.getRootDir(peerConf)
242            + " peerFSAddress:" + peerFSAddress);
243
244          replicatedScanner = new TableSnapshotScanner(peerConf, CommonFSUtils.getRootDir(peerConf),
245            new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true);
246        } else {
247          replicatedScanner = replicatedTable.getScanner(scan);
248        }
249        currentCompareRowInPeerTable = replicatedScanner.next();
250      }
251      while (true) {
252        if (currentCompareRowInPeerTable == null) {
253          // reach the region end of peer table, row only in source table
254          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
255          break;
256        }
257        int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
258        if (rowCmpRet == 0) {
259          // rowkey is same, need to compare the content of the row
260          try {
261            Result.compareResults(value, currentCompareRowInPeerTable, false);
262            context.getCounter(Counters.GOODROWS).increment(1);
263            if (verbose) {
264              LOG.info(
265                "Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter);
266            }
267          } catch (Exception e) {
268            logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value,
269              currentCompareRowInPeerTable);
270          }
271          currentCompareRowInPeerTable = replicatedScanner.next();
272          break;
273        } else if (rowCmpRet < 0) {
274          // row only exists in source table
275          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null);
276          break;
277        } else {
278          // row only exists in peer table
279          logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null,
280            currentCompareRowInPeerTable);
281          currentCompareRowInPeerTable = replicatedScanner.next();
282        }
283      }
284    }
285
286    @SuppressWarnings("FutureReturnValueIgnored")
287    private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row,
288      Result replicatedRow) {
289      byte[] rowKey = getRow(row, replicatedRow);
290      if (reCompareTries == 0) {
291        context.getCounter(counter).increment(1);
292        context.getCounter(Counters.BADROWS).increment(1);
293        LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(rowKey), delimiter);
294        return;
295      }
296
297      VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context,
298        row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable,
299        reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose);
300
301      if (reCompareExecutor == null) {
302        runnable.run();
303        return;
304      }
305
306      reCompareExecutor.submit(runnable);
307    }
308
309    @Override
310    protected void cleanup(Context context) {
311      if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) {
312        reCompareExecutor.shutdown();
313        try {
314          boolean terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES);
315          if (!terminated) {
316            List<Runnable> queue = reCompareExecutor.shutdownNow();
317            for (Runnable runnable : queue) {
318              ((VerifyReplicationRecompareRunnable) runnable).fail();
319            }
320
321            terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES);
322
323            if (!terminated) {
324              int activeCount = Math.max(1, reCompareExecutor.getActiveCount());
325              LOG.warn("Found {} possible recompares still running in the executable"
326                + " incrementing BADROWS and FAILED_RECOMPARE", activeCount);
327              context.getCounter(Counters.BADROWS).increment(activeCount);
328              context.getCounter(Counters.FAILED_RECOMPARE).increment(activeCount);
329            }
330          }
331        } catch (InterruptedException e) {
332          throw new RuntimeException("Failed to await executor termination in cleanup", e);
333        }
334      }
335      if (replicatedScanner != null) {
336        try {
337          while (currentCompareRowInPeerTable != null) {
338            logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null,
339              currentCompareRowInPeerTable);
340            currentCompareRowInPeerTable = replicatedScanner.next();
341          }
342        } catch (Exception e) {
343          LOG.error("fail to scan peer table in cleanup", e);
344        } finally {
345          replicatedScanner.close();
346          replicatedScanner = null;
347        }
348      }
349
350      if (sourceTable != null) {
351        try {
352          sourceTable.close();
353        } catch (IOException e) {
354          LOG.error("fail to close source table in cleanup", e);
355        }
356      }
357      if (sourceConnection != null) {
358        try {
359          sourceConnection.close();
360        } catch (Exception e) {
361          LOG.error("fail to close source connection in cleanup", e);
362        }
363      }
364
365      if (replicatedTable != null) {
366        try {
367          replicatedTable.close();
368        } catch (Exception e) {
369          LOG.error("fail to close replicated table in cleanup", e);
370        }
371      }
372      if (replicatedConnection != null) {
373        try {
374          replicatedConnection.close();
375        } catch (Exception e) {
376          LOG.error("fail to close replicated connection in cleanup", e);
377        }
378      }
379    }
380  }
381
382  private static Pair<ReplicationPeerConfig, Configuration>
383    getPeerQuorumConfig(final Configuration conf, String peerId) throws IOException {
384    ZKWatcher localZKW = null;
385    try {
386      localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
387        @Override
388        public void abort(String why, Throwable e) {
389        }
390
391        @Override
392        public boolean isAborted() {
393          return false;
394        }
395      });
396      ReplicationPeerStorage storage =
397        ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), localZKW, conf);
398      ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
399      return Pair.newPair(peerConfig,
400        ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));
401    } catch (ReplicationException e) {
402      throw new IOException("An error occurred while trying to connect to the remote peer cluster",
403        e);
404    } finally {
405      if (localZKW != null) {
406        localZKW.close();
407      }
408    }
409  }
410
411  private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress)
412    throws IOException {
413    Configuration peerConf =
414      HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
415    FileSystem.setDefaultUri(peerConf, peerFSAddress);
416    CommonFSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress));
417    FileSystem fs = FileSystem.get(peerConf);
418    RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, CommonFSUtils.getRootDir(peerConf),
419      new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName);
420  }
421
422  /**
423   * Sets up the actual job.
424   * @param conf The current configuration.
425   * @param args The command line parameters.
426   * @return The newly created job.
427   * @throws java.io.IOException When setting up the job fails.
428   */
429  public Job createSubmittableJob(Configuration conf, String[] args) throws IOException {
430    if (!doCommandLine(args)) {
431      return null;
432    }
433    conf.set(NAME + ".tableName", tableName);
434    conf.setLong(NAME + ".startTime", startTime);
435    conf.setLong(NAME + ".endTime", endTime);
436    conf.setInt(NAME + ".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
437    conf.set(NAME + ".delimiter", delimiter);
438    conf.setInt(NAME + ".batch", batch);
439    conf.setBoolean(NAME + ".verbose", verbose);
440    conf.setBoolean(NAME + ".includeDeletedCells", includeDeletedCells);
441    if (families != null) {
442      conf.set(NAME + ".families", families);
443    }
444    if (rowPrefixes != null) {
445      conf.set(NAME + ".rowPrefixes", rowPrefixes);
446    }
447
448    String peerQuorumAddress;
449    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null;
450    if (peerId != null) {
451      peerConfigPair = getPeerQuorumConfig(conf, peerId);
452      ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
453      peerQuorumAddress = peerConfig.getClusterKey();
454      LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: "
455        + peerConfig.getConfiguration());
456      conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
457      HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
458        peerConfig.getConfiguration().entrySet());
459    } else {
460      assert this.peerQuorumAddress != null;
461      peerQuorumAddress = this.peerQuorumAddress;
462      LOG.info("Peer Quorum Address: " + peerQuorumAddress);
463      conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
464    }
465
466    if (peerTableName != null) {
467      LOG.info("Peer Table Name: " + peerTableName);
468      conf.set(NAME + ".peerTableName", peerTableName);
469    }
470
471    conf.setInt(NAME + ".versions", versions);
472    LOG.info("Number of version: " + versions);
473
474    conf.setInt(NAME + ".recompareTries", reCompareTries);
475    conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent);
476    conf.setInt(NAME + ".recompareThreads", reCompareThreads);
477
478    // Set Snapshot specific parameters
479    if (peerSnapshotName != null) {
480      conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
481
482      // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to
483      // restore snapshot.
484      Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString());
485      peerSnapshotTmpDir = restoreDir.toString();
486      conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
487
488      conf.set(NAME + ".peerFSAddress", peerFSAddress);
489      conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
490
491      // This is to create HDFS delegation token for peer cluster in case of secured
492      conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress, conf.get(HConstants.HBASE_DIR));
493    }
494
495    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
496    job.setJarByClass(VerifyReplication.class);
497
498    Scan scan = new Scan();
499    scan.setTimeRange(startTime, endTime);
500    scan.setRaw(includeDeletedCells);
501    scan.setCacheBlocks(false);
502    if (batch > 0) {
503      scan.setBatch(batch);
504    }
505    if (versions >= 0) {
506      scan.readVersions(versions);
507      LOG.info("Number of versions set to " + versions);
508    }
509    if (families != null) {
510      String[] fams = families.split(",");
511      for (String fam : fams) {
512        scan.addFamily(Bytes.toBytes(fam));
513      }
514    }
515
516    setRowPrefixFilter(scan, rowPrefixes);
517
518    if (sourceSnapshotName != null) {
519      Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
520      LOG.info(
521        "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
522      TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
523        null, job, true, snapshotTempPath);
524      restoreSnapshotForPeerCluster(conf, peerQuorumAddress);
525    } else {
526      TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
527    }
528
529    Configuration peerClusterConf;
530    if (peerId != null) {
531      assert peerConfigPair != null;
532      peerClusterConf = peerConfigPair.getSecond();
533    } else {
534      peerClusterConf =
535        HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
536    }
537    // Obtain the auth token from peer cluster
538    TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
539
540    job.setOutputFormatClass(NullOutputFormat.class);
541    job.setNumReduceTasks(0);
542    return job;
543  }
544
545  protected static byte[] getRow(Result sourceResult, Result replicatedResult) {
546    if (sourceResult != null) {
547      return sourceResult.getRow();
548    } else if (replicatedResult != null) {
549      return replicatedResult.getRow();
550    }
551    throw new RuntimeException("Both sourceResult and replicatedResult are null!");
552  }
553
554  private static void setRowPrefixFilter(Scan scan, String rowPrefixes) {
555    if (rowPrefixes != null && !rowPrefixes.isEmpty()) {
556      String[] rowPrefixArray = rowPrefixes.split(",");
557      Arrays.sort(rowPrefixArray);
558      FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
559      for (String prefix : rowPrefixArray) {
560        Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
561        filterList.addFilter(filter);
562      }
563      scan.setFilter(filterList);
564      byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]);
565      byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length - 1]);
566      setStartAndStopRows(scan, startPrefixRow, lastPrefixRow);
567    }
568  }
569
570  private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) {
571    scan.withStartRow(startPrefixRow);
572    byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1),
573      new byte[] { (byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1) });
574    scan.withStopRow(stopRow);
575  }
576
577  public boolean doCommandLine(final String[] args) {
578    if (args.length < 2) {
579      printUsage(null);
580      return false;
581    }
582    try {
583      for (int i = 0; i < args.length; i++) {
584        String cmd = args[i];
585        if (cmd.equals("-h") || cmd.startsWith("--h")) {
586          printUsage(null);
587          return false;
588        }
589
590        final String startTimeArgKey = "--starttime=";
591        if (cmd.startsWith(startTimeArgKey)) {
592          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
593          continue;
594        }
595
596        final String endTimeArgKey = "--endtime=";
597        if (cmd.startsWith(endTimeArgKey)) {
598          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
599          continue;
600        }
601
602        final String includeDeletedCellsArgKey = "--raw";
603        if (cmd.equals(includeDeletedCellsArgKey)) {
604          includeDeletedCells = true;
605          continue;
606        }
607
608        final String versionsArgKey = "--versions=";
609        if (cmd.startsWith(versionsArgKey)) {
610          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
611          continue;
612        }
613
614        final String batchArgKey = "--batch=";
615        if (cmd.startsWith(batchArgKey)) {
616          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
617          continue;
618        }
619
620        final String familiesArgKey = "--families=";
621        if (cmd.startsWith(familiesArgKey)) {
622          families = cmd.substring(familiesArgKey.length());
623          continue;
624        }
625
626        final String rowPrefixesKey = "--row-prefixes=";
627        if (cmd.startsWith(rowPrefixesKey)) {
628          rowPrefixes = cmd.substring(rowPrefixesKey.length());
629          continue;
630        }
631
632        final String delimiterArgKey = "--delimiter=";
633        if (cmd.startsWith(delimiterArgKey)) {
634          delimiter = cmd.substring(delimiterArgKey.length());
635          continue;
636        }
637
638        final String deprecatedSleepToReCompareKey = "--recomparesleep=";
639        final String sleepToReCompareKey = "--recompareSleep=";
640        if (cmd.startsWith(deprecatedSleepToReCompareKey)) {
641          LOG.warn("--recomparesleep is deprecated and will be removed in 4.0.0."
642            + " Use --recompareSleep instead.");
643          sleepMsBeforeReCompare =
644            Integer.parseInt(cmd.substring(deprecatedSleepToReCompareKey.length()));
645          continue;
646        }
647        if (cmd.startsWith(sleepToReCompareKey)) {
648          sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
649          continue;
650        }
651
652        final String verboseKey = "--verbose";
653        if (cmd.startsWith(verboseKey)) {
654          verbose = true;
655          continue;
656        }
657
658        final String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
659        if (cmd.startsWith(sourceSnapshotNameArgKey)) {
660          sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length());
661          continue;
662        }
663
664        final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
665        if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) {
666          sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length());
667          continue;
668        }
669
670        final String peerSnapshotNameArgKey = "--peerSnapshotName=";
671        if (cmd.startsWith(peerSnapshotNameArgKey)) {
672          peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length());
673          continue;
674        }
675
676        final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
677        if (cmd.startsWith(peerSnapshotTmpDirArgKey)) {
678          peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length());
679          continue;
680        }
681
682        final String peerFSAddressArgKey = "--peerFSAddress=";
683        if (cmd.startsWith(peerFSAddressArgKey)) {
684          peerFSAddress = cmd.substring(peerFSAddressArgKey.length());
685          continue;
686        }
687
688        final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
689        if (cmd.startsWith(peerHBaseRootAddressArgKey)) {
690          peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length());
691          continue;
692        }
693
694        final String peerTableNameArgKey = "--peerTableName=";
695        if (cmd.startsWith(peerTableNameArgKey)) {
696          peerTableName = cmd.substring(peerTableNameArgKey.length());
697          continue;
698        }
699
700        final String reCompareThreadArgs = "--recompareThreads=";
701        if (cmd.startsWith(reCompareThreadArgs)) {
702          reCompareThreads = Integer.parseInt(cmd.substring(reCompareThreadArgs.length()));
703          continue;
704        }
705
706        final String reCompareTriesKey = "--recompareTries=";
707        if (cmd.startsWith(reCompareTriesKey)) {
708          reCompareTries = Integer.parseInt(cmd.substring(reCompareTriesKey.length()));
709          continue;
710        }
711
712        final String reCompareBackoffExponentKey = "--recompareBackoffExponent=";
713        if (cmd.startsWith(reCompareBackoffExponentKey)) {
714          reCompareBackoffExponent =
715            Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length()));
716          continue;
717        }
718
719        if (cmd.startsWith("--")) {
720          printUsage("Invalid argument '" + cmd + "'");
721          return false;
722        }
723
724        if (i == args.length - 2) {
725          if (isPeerQuorumAddress(cmd)) {
726            peerQuorumAddress = cmd;
727          } else {
728            peerId = cmd;
729          }
730        }
731
732        if (i == args.length - 1) {
733          tableName = cmd;
734        }
735      }
736
737      if (
738        (sourceSnapshotName != null && sourceSnapshotTmpDir == null)
739          || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)
740      ) {
741        printUsage("Source snapshot name and snapshot temp location should be provided"
742          + " to use snapshots in source cluster");
743        return false;
744      }
745
746      if (
747        peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null
748          || peerHBaseRootAddress != null
749      ) {
750        if (
751          peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null
752            || peerHBaseRootAddress == null
753        ) {
754          printUsage(
755            "Peer snapshot name, peer snapshot temp location, Peer HBase root address and  "
756              + "peer FSAddress should be provided to use snapshots in peer cluster");
757          return false;
758        }
759      }
760
761      // This is to avoid making recompare calls to source/peer tables when snapshots are used
762      if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
763        printUsage(
764          "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are"
765            + " immutable");
766        return false;
767      }
768
769    } catch (Exception e) {
770      LOG.error("Failed to parse commandLine arguments", e);
771      printUsage("Can't start because " + e.getMessage());
772      return false;
773    }
774    return true;
775  }
776
777  private boolean isPeerQuorumAddress(String cmd) {
778    try {
779      ZKConfig.validateClusterKey(cmd);
780    } catch (IOException e) {
781      // not a quorum address
782      return false;
783    }
784    return true;
785  }
786
787  /*
788   * @param errorMsg Error message. Can be null.
789   */
790  private static void printUsage(final String errorMsg) {
791    if (errorMsg != null && errorMsg.length() > 0) {
792      System.err.println("ERROR: " + errorMsg);
793    }
794    System.err.println("Usage: verifyrep [--starttime=X]"
795      + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recompareSleep=] "
796      + "[--recompareThreads=] [--recompareTries=] [--recompareBackoffExponent=]"
797      + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] "
798      + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] "
799      + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
800    System.err.println();
801    System.err.println("Options:");
802    System.err.println(" starttime    beginning of the time range");
803    System.err.println("              without endtime means from starttime to forever");
804    System.err.println(" endtime      end of the time range");
805    System.err.println(" versions     number of cell versions to verify");
806    System.err.println(" batch        batch count for scan, note that"
807      + " result row counts will no longer be actual number of rows when you use this option");
808    System.err.println(" raw          includes raw scan if given in options");
809    System.err.println(" families     comma-separated list of families to copy");
810    System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
811    System.err.println(" delimiter    the delimiter used in display around rowkey");
812    System.err.println(" recompareSleep   milliseconds to sleep before recompare row, "
813      + "default value is 0 which disables the recompare.");
814    System.err.println(" recompareThreads number of threads to run recompares in");
815    System.err.println(" recompareTries   number of recompare attempts before incrementing "
816      + "the BADROWS counter. Defaults to 1 recompare");
817    System.out.println(" recompareBackoffExponent exponential multiplier to increase "
818      + "recompareSleep after each recompare attempt, "
819      + "default value is 0 which results in a constant sleep time");
820    System.err.println(" verbose      logs row keys of good rows");
821    System.err.println(" peerTableName  Peer Table Name");
822    System.err.println(" sourceSnapshotName  Source Snapshot Name");
823    System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
824    System.err.println(" peerSnapshotName  Peer Snapshot Name");
825    System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
826    System.err.println(" peerFSAddress      Peer cluster Hadoop FS address");
827    System.err.println(" peerHBaseRootAddress  Peer cluster HBase root location");
828    System.err.println();
829    System.err.println("Args:");
830    System.err.println(" peerid       Id of the peer used for verification,"
831      + " must match the one given for replication");
832    System.err.println(" peerQuorumAddress   quorumAdress of the peer used for verification. The "
833      + "format is zk_quorum:zk_port:zk_hbase_path");
834    System.err.println(" tablename    Name of the table to verify");
835    System.err.println();
836    System.err.println("Examples:");
837    System.err
838      .println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
839    System.err
840      .println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication"
841        + " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
842    System.err.println();
843    System.err.println(
844      " To verify the data in TestTable between the cluster runs VerifyReplication and cluster-b");
845    System.err.println(" Assume quorum address for cluster-b is"
846      + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b");
847    System.err
848      .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n"
849        + "     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
850        + "2181:/cluster-b \\\n" + "     TestTable");
851    System.err.println();
852    System.err
853      .println(" To verify the data in TestTable between the secured cluster runs VerifyReplication"
854        + " and insecure cluster-b");
855    System.err
856      .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n"
857        + "     -D verifyrep.peer.hbase.security.authentication=simple \\\n"
858        + "     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
859        + "2181:/cluster-b \\\n" + "     TestTable");
860    System.err.println();
861    System.err.println(" To verify the data in TestTable between"
862      + " the secured cluster runs VerifyReplication and secured cluster-b");
863    System.err.println(" Assume cluster-b uses different kerberos principal, cluster-b/_HOST@E"
864      + ", for master and regionserver kerberos principal from another cluster");
865    System.err
866      .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n"
867        + "     -D verifyrep.peer.hbase.regionserver.kerberos.principal="
868        + "cluster-b/_HOST@EXAMPLE.COM \\\n"
869        + "     -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n"
870        + "     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
871        + "2181:/cluster-b \\\n" + "     TestTable");
872    System.err.println();
873    System.err.println(
874      " To verify the data in TestTable between the insecure cluster runs VerifyReplication"
875        + " and secured cluster-b");
876    System.err
877      .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n"
878        + "     -D verifyrep.peer.hbase.security.authentication=kerberos \\\n"
879        + "     -D verifyrep.peer.hbase.regionserver.kerberos.principal="
880        + "cluster-b/_HOST@EXAMPLE.COM \\\n"
881        + "     -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n"
882        + "     cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
883        + "2181:/cluster-b \\\n" + "     TestTable");
884  }
885
886  private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.Context context) {
887    if (maxThreads == 0) {
888      return null;
889    }
890
891    return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(),
892      buildRejectedReComparePolicy(context));
893  }
894
895  private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) {
896    return new CallerRunsPolicy() {
897      @Override
898      public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
899        LOG.debug("Re-comparison execution rejected. Running in main thread.");
900        context.getCounter(Counters.MAIN_THREAD_RECOMPARES).increment(1);
901        // will run in the current thread
902        super.rejectedExecution(runnable, e);
903      }
904    };
905  }
906
907  @Override
908  public int run(String[] args) throws Exception {
909    Configuration conf = this.getConf();
910    Job job = createSubmittableJob(conf, args);
911    if (job != null) {
912      return job.waitForCompletion(true) ? 0 : 1;
913    }
914    return 1;
915  }
916
917  /**
918   * Main entry point.
919   * @param args The command line parameters.
920   * @throws Exception When running the job fails.
921   */
922  public static void main(String[] args) throws Exception {
923    int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
924    System.exit(res);
925  }
926}