001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce.replication; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.UUID; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.conf.Configured; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Abortable; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.Get; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.ResultScanner; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.client.TableSnapshotScanner; 041import org.apache.hadoop.hbase.filter.Filter; 042import org.apache.hadoop.hbase.filter.FilterList; 043import org.apache.hadoop.hbase.filter.PrefixFilter; 044import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 045import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 046import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 047import org.apache.hadoop.hbase.mapreduce.TableMapper; 048import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; 049import org.apache.hadoop.hbase.mapreduce.TableSplit; 050import org.apache.hadoop.hbase.replication.ReplicationException; 051import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 052import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 053import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 054import org.apache.hadoop.hbase.replication.ReplicationUtils; 055import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.CommonFSUtils; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.hbase.util.Threads; 060import org.apache.hadoop.hbase.zookeeper.ZKConfig; 061import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 062import org.apache.hadoop.mapreduce.InputSplit; 063import org.apache.hadoop.mapreduce.Job; 064import org.apache.hadoop.mapreduce.MRJobConfig; 065import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 066import org.apache.hadoop.util.Tool; 067import org.apache.hadoop.util.ToolRunner; 068import org.apache.yetus.audience.InterfaceAudience; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 073 074/** 075 * This map-only job compares the data from a local table with a remote one. 076 * Every cell is compared and must have exactly the same keys (even timestamp) 077 * as well as same value. It is possible to restrict the job by time range and 078 * families. The peer id that's provided must match the one given when the 079 * replication stream was setup. 080 * <p> 081 * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason 082 * for a why a row is different is shown in the map's log. 083 */ 084@InterfaceAudience.Private 085public class VerifyReplication extends Configured implements Tool { 086 087 private static final Logger LOG = 088 LoggerFactory.getLogger(VerifyReplication.class); 089 090 public final static String NAME = "verifyrep"; 091 private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; 092 long startTime = 0; 093 long endTime = Long.MAX_VALUE; 094 int batch = -1; 095 int versions = -1; 096 String tableName = null; 097 String families = null; 098 String delimiter = ""; 099 String peerId = null; 100 String peerQuorumAddress = null; 101 String rowPrefixes = null; 102 int sleepMsBeforeReCompare = 0; 103 boolean verbose = false; 104 boolean includeDeletedCells = false; 105 //Source table snapshot name 106 String sourceSnapshotName = null; 107 //Temp location in source cluster to restore source snapshot 108 String sourceSnapshotTmpDir = null; 109 //Peer table snapshot name 110 String peerSnapshotName = null; 111 //Temp location in peer cluster to restore peer snapshot 112 String peerSnapshotTmpDir = null; 113 //Peer cluster Hadoop FS address 114 String peerFSAddress = null; 115 //Peer cluster HBase root dir location 116 String peerHBaseRootAddress = null; 117 //Peer Table Name 118 String peerTableName = null; 119 120 121 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 122 123 /** 124 * Map-only comparator for 2 tables 125 */ 126 public static class Verifier 127 extends TableMapper<ImmutableBytesWritable, Put> { 128 129 public enum Counters { 130 GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS 131 } 132 133 private Connection sourceConnection; 134 private Table sourceTable; 135 private Connection replicatedConnection; 136 private Table replicatedTable; 137 private ResultScanner replicatedScanner; 138 private Result currentCompareRowInPeerTable; 139 private int sleepMsBeforeReCompare; 140 private String delimiter = ""; 141 private boolean verbose = false; 142 private int batch = -1; 143 144 /** 145 * Map method that compares every scanned row with the equivalent from 146 * a distant cluster. 147 * @param row The current table row key. 148 * @param value The columns. 149 * @param context The current context. 150 * @throws IOException When something is broken with the data. 151 */ 152 @Override 153 public void map(ImmutableBytesWritable row, final Result value, 154 Context context) 155 throws IOException { 156 if (replicatedScanner == null) { 157 Configuration conf = context.getConfiguration(); 158 sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0); 159 delimiter = conf.get(NAME + ".delimiter", ""); 160 verbose = conf.getBoolean(NAME +".verbose", false); 161 batch = conf.getInt(NAME + ".batch", -1); 162 final Scan scan = new Scan(); 163 if (batch > 0) { 164 scan.setBatch(batch); 165 } 166 scan.setCacheBlocks(false); 167 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); 168 long startTime = conf.getLong(NAME + ".startTime", 0); 169 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); 170 String families = conf.get(NAME + ".families", null); 171 if(families != null) { 172 String[] fams = families.split(","); 173 for(String fam : fams) { 174 scan.addFamily(Bytes.toBytes(fam)); 175 } 176 } 177 boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); 178 scan.setRaw(includeDeletedCells); 179 String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); 180 setRowPrefixFilter(scan, rowPrefixes); 181 scan.setTimeRange(startTime, endTime); 182 int versions = conf.getInt(NAME+".versions", -1); 183 LOG.info("Setting number of version inside map as: " + versions); 184 if (versions >= 0) { 185 scan.setMaxVersions(versions); 186 } 187 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); 188 sourceConnection = ConnectionFactory.createConnection(conf); 189 sourceTable = sourceConnection.getTable(tableName); 190 191 final InputSplit tableSplit = context.getInputSplit(); 192 193 String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); 194 Configuration peerConf = HBaseConfiguration.createClusterConf(conf, 195 zkClusterKey, PEER_CONFIG_PREFIX); 196 197 String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString()); 198 TableName peerTableName = TableName.valueOf(peerName); 199 replicatedConnection = ConnectionFactory.createConnection(peerConf); 200 replicatedTable = replicatedConnection.getTable(peerTableName); 201 scan.setStartRow(value.getRow()); 202 203 byte[] endRow = null; 204 if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) { 205 endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo() 206 .getEndKey(); 207 } else { 208 endRow = ((TableSplit) tableSplit).getEndRow(); 209 } 210 211 scan.setStopRow(endRow); 212 213 String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null); 214 if (peerSnapshotName != null) { 215 String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null); 216 String peerFSAddress = conf.get(NAME + ".peerFSAddress", null); 217 String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null); 218 FileSystem.setDefaultUri(peerConf, peerFSAddress); 219 CommonFSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress)); 220 LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:" + 221 peerSnapshotTmpDir + " peer root uri:" + CommonFSUtils.getRootDir(peerConf) + 222 " peerFSAddress:" + peerFSAddress); 223 224 replicatedScanner = new TableSnapshotScanner(peerConf, CommonFSUtils.getRootDir(peerConf), 225 new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true); 226 } else { 227 replicatedScanner = replicatedTable.getScanner(scan); 228 } 229 currentCompareRowInPeerTable = replicatedScanner.next(); 230 } 231 while (true) { 232 if (currentCompareRowInPeerTable == null) { 233 // reach the region end of peer table, row only in source table 234 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); 235 break; 236 } 237 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); 238 if (rowCmpRet == 0) { 239 // rowkey is same, need to compare the content of the row 240 try { 241 Result.compareResults(value, currentCompareRowInPeerTable); 242 context.getCounter(Counters.GOODROWS).increment(1); 243 if (verbose) { 244 LOG.info("Good row key: " + delimiter 245 + Bytes.toStringBinary(value.getRow()) + delimiter); 246 } 247 } catch (Exception e) { 248 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); 249 } 250 currentCompareRowInPeerTable = replicatedScanner.next(); 251 break; 252 } else if (rowCmpRet < 0) { 253 // row only exists in source table 254 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); 255 break; 256 } else { 257 // row only exists in peer table 258 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, 259 currentCompareRowInPeerTable); 260 currentCompareRowInPeerTable = replicatedScanner.next(); 261 } 262 } 263 } 264 265 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { 266 if (sleepMsBeforeReCompare > 0) { 267 Threads.sleep(sleepMsBeforeReCompare); 268 try { 269 Result sourceResult = sourceTable.get(new Get(row.getRow())); 270 Result replicatedResult = replicatedTable.get(new Get(row.getRow())); 271 Result.compareResults(sourceResult, replicatedResult); 272 if (!sourceResult.isEmpty()) { 273 context.getCounter(Counters.GOODROWS).increment(1); 274 if (verbose) { 275 LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow()) 276 + delimiter); 277 } 278 } 279 return; 280 } catch (Exception e) { 281 LOG.error("recompare fail after sleep, rowkey=" + delimiter + 282 Bytes.toStringBinary(row.getRow()) + delimiter); 283 } 284 } 285 context.getCounter(counter).increment(1); 286 context.getCounter(Counters.BADROWS).increment(1); 287 LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) + 288 delimiter); 289 } 290 291 @Override 292 protected void cleanup(Context context) { 293 if (replicatedScanner != null) { 294 try { 295 while (currentCompareRowInPeerTable != null) { 296 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, 297 currentCompareRowInPeerTable); 298 currentCompareRowInPeerTable = replicatedScanner.next(); 299 } 300 } catch (Exception e) { 301 LOG.error("fail to scan peer table in cleanup", e); 302 } finally { 303 replicatedScanner.close(); 304 replicatedScanner = null; 305 } 306 } 307 308 if (sourceTable != null) { 309 try { 310 sourceTable.close(); 311 } catch (IOException e) { 312 LOG.error("fail to close source table in cleanup", e); 313 } 314 } 315 if(sourceConnection != null){ 316 try { 317 sourceConnection.close(); 318 } catch (Exception e) { 319 LOG.error("fail to close source connection in cleanup", e); 320 } 321 } 322 323 if(replicatedTable != null){ 324 try{ 325 replicatedTable.close(); 326 } catch (Exception e) { 327 LOG.error("fail to close replicated table in cleanup", e); 328 } 329 } 330 if(replicatedConnection != null){ 331 try { 332 replicatedConnection.close(); 333 } catch (Exception e) { 334 LOG.error("fail to close replicated connection in cleanup", e); 335 } 336 } 337 } 338 } 339 340 private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig( 341 final Configuration conf, String peerId) throws IOException { 342 ZKWatcher localZKW = null; 343 try { 344 localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() { 345 @Override 346 public void abort(String why, Throwable e) { 347 } 348 349 @Override 350 public boolean isAborted() { 351 return false; 352 } 353 }); 354 ReplicationPeerStorage storage = 355 ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf); 356 ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId); 357 return Pair.newPair(peerConfig, 358 ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf)); 359 } catch (ReplicationException e) { 360 throw new IOException("An error occurred while trying to connect to the remote peer cluster", 361 e); 362 } finally { 363 if (localZKW != null) { 364 localZKW.close(); 365 } 366 } 367 } 368 369 private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress) 370 throws IOException { 371 Configuration peerConf = 372 HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); 373 FileSystem.setDefaultUri(peerConf, peerFSAddress); 374 CommonFSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress)); 375 FileSystem fs = FileSystem.get(peerConf); 376 RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, CommonFSUtils.getRootDir(peerConf), 377 new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName); 378 } 379 380 /** 381 * Sets up the actual job. 382 * 383 * @param conf The current configuration. 384 * @param args The command line parameters. 385 * @return The newly created job. 386 * @throws java.io.IOException When setting up the job fails. 387 */ 388 public Job createSubmittableJob(Configuration conf, String[] args) 389 throws IOException { 390 if (!doCommandLine(args)) { 391 return null; 392 } 393 conf.set(NAME+".tableName", tableName); 394 conf.setLong(NAME+".startTime", startTime); 395 conf.setLong(NAME+".endTime", endTime); 396 conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare); 397 conf.set(NAME + ".delimiter", delimiter); 398 conf.setInt(NAME + ".batch", batch); 399 conf.setBoolean(NAME +".verbose", verbose); 400 conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells); 401 if (families != null) { 402 conf.set(NAME+".families", families); 403 } 404 if (rowPrefixes != null){ 405 conf.set(NAME+".rowPrefixes", rowPrefixes); 406 } 407 408 String peerQuorumAddress; 409 Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null; 410 if (peerId != null) { 411 peerConfigPair = getPeerQuorumConfig(conf, peerId); 412 ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); 413 peerQuorumAddress = peerConfig.getClusterKey(); 414 LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + 415 peerConfig.getConfiguration()); 416 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); 417 HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, 418 peerConfig.getConfiguration().entrySet()); 419 } else { 420 assert this.peerQuorumAddress != null; 421 peerQuorumAddress = this.peerQuorumAddress; 422 LOG.info("Peer Quorum Address: " + peerQuorumAddress); 423 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); 424 } 425 426 if (peerTableName != null) { 427 LOG.info("Peer Table Name: " + peerTableName); 428 conf.set(NAME + ".peerTableName", peerTableName); 429 } 430 431 conf.setInt(NAME + ".versions", versions); 432 LOG.info("Number of version: " + versions); 433 434 //Set Snapshot specific parameters 435 if (peerSnapshotName != null) { 436 conf.set(NAME + ".peerSnapshotName", peerSnapshotName); 437 438 // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to 439 // restore snapshot. 440 Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString()); 441 peerSnapshotTmpDir = restoreDir.toString(); 442 conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir); 443 444 conf.set(NAME + ".peerFSAddress", peerFSAddress); 445 conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress); 446 447 // This is to create HDFS delegation token for peer cluster in case of secured 448 conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress, conf.get(HConstants.HBASE_DIR)); 449 } 450 451 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 452 job.setJarByClass(VerifyReplication.class); 453 454 Scan scan = new Scan(); 455 scan.setTimeRange(startTime, endTime); 456 scan.setRaw(includeDeletedCells); 457 scan.setCacheBlocks(false); 458 if (batch > 0) { 459 scan.setBatch(batch); 460 } 461 if (versions >= 0) { 462 scan.setMaxVersions(versions); 463 LOG.info("Number of versions set to " + versions); 464 } 465 if(families != null) { 466 String[] fams = families.split(","); 467 for(String fam : fams) { 468 scan.addFamily(Bytes.toBytes(fam)); 469 } 470 } 471 472 setRowPrefixFilter(scan, rowPrefixes); 473 474 if (sourceSnapshotName != null) { 475 Path snapshotTempPath = new Path(sourceSnapshotTmpDir); 476 LOG.info( 477 "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir); 478 TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null, 479 null, job, true, snapshotTempPath); 480 restoreSnapshotForPeerCluster(conf, peerQuorumAddress); 481 } else { 482 TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); 483 } 484 485 if (peerId != null) { 486 assert peerConfigPair != null; 487 Configuration peerClusterConf = peerConfigPair.getSecond(); 488 // Obtain the auth token from peer cluster 489 TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); 490 } 491 492 job.setOutputFormatClass(NullOutputFormat.class); 493 job.setNumReduceTasks(0); 494 return job; 495 } 496 497 private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { 498 if (rowPrefixes != null && !rowPrefixes.isEmpty()) { 499 String[] rowPrefixArray = rowPrefixes.split(","); 500 Arrays.sort(rowPrefixArray); 501 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); 502 for (String prefix : rowPrefixArray) { 503 Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); 504 filterList.addFilter(filter); 505 } 506 scan.setFilter(filterList); 507 byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]); 508 byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]); 509 setStartAndStopRows(scan, startPrefixRow, lastPrefixRow); 510 } 511 } 512 513 private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) { 514 scan.setStartRow(startPrefixRow); 515 byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), 516 new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)}); 517 scan.setStopRow(stopRow); 518 } 519 520 @VisibleForTesting 521 public boolean doCommandLine(final String[] args) { 522 if (args.length < 2) { 523 printUsage(null); 524 return false; 525 } 526 try { 527 for (int i = 0; i < args.length; i++) { 528 String cmd = args[i]; 529 if (cmd.equals("-h") || cmd.startsWith("--h")) { 530 printUsage(null); 531 return false; 532 } 533 534 final String startTimeArgKey = "--starttime="; 535 if (cmd.startsWith(startTimeArgKey)) { 536 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 537 continue; 538 } 539 540 final String endTimeArgKey = "--endtime="; 541 if (cmd.startsWith(endTimeArgKey)) { 542 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 543 continue; 544 } 545 546 final String includeDeletedCellsArgKey = "--raw"; 547 if (cmd.equals(includeDeletedCellsArgKey)) { 548 includeDeletedCells = true; 549 continue; 550 } 551 552 final String versionsArgKey = "--versions="; 553 if (cmd.startsWith(versionsArgKey)) { 554 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 555 continue; 556 } 557 558 final String batchArgKey = "--batch="; 559 if (cmd.startsWith(batchArgKey)) { 560 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 561 continue; 562 } 563 564 final String familiesArgKey = "--families="; 565 if (cmd.startsWith(familiesArgKey)) { 566 families = cmd.substring(familiesArgKey.length()); 567 continue; 568 } 569 570 final String rowPrefixesKey = "--row-prefixes="; 571 if (cmd.startsWith(rowPrefixesKey)){ 572 rowPrefixes = cmd.substring(rowPrefixesKey.length()); 573 continue; 574 } 575 576 final String delimiterArgKey = "--delimiter="; 577 if (cmd.startsWith(delimiterArgKey)) { 578 delimiter = cmd.substring(delimiterArgKey.length()); 579 continue; 580 } 581 582 final String sleepToReCompareKey = "--recomparesleep="; 583 if (cmd.startsWith(sleepToReCompareKey)) { 584 sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); 585 continue; 586 } 587 final String verboseKey = "--verbose"; 588 if (cmd.startsWith(verboseKey)) { 589 verbose = true; 590 continue; 591 } 592 593 final String sourceSnapshotNameArgKey = "--sourceSnapshotName="; 594 if (cmd.startsWith(sourceSnapshotNameArgKey)) { 595 sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length()); 596 continue; 597 } 598 599 final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir="; 600 if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) { 601 sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length()); 602 continue; 603 } 604 605 final String peerSnapshotNameArgKey = "--peerSnapshotName="; 606 if (cmd.startsWith(peerSnapshotNameArgKey)) { 607 peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length()); 608 continue; 609 } 610 611 final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir="; 612 if (cmd.startsWith(peerSnapshotTmpDirArgKey)) { 613 peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length()); 614 continue; 615 } 616 617 final String peerFSAddressArgKey = "--peerFSAddress="; 618 if (cmd.startsWith(peerFSAddressArgKey)) { 619 peerFSAddress = cmd.substring(peerFSAddressArgKey.length()); 620 continue; 621 } 622 623 final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress="; 624 if (cmd.startsWith(peerHBaseRootAddressArgKey)) { 625 peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length()); 626 continue; 627 } 628 629 final String peerTableNameArgKey = "--peerTableName="; 630 if (cmd.startsWith(peerTableNameArgKey)) { 631 peerTableName = cmd.substring(peerTableNameArgKey.length()); 632 continue; 633 } 634 635 if (cmd.startsWith("--")) { 636 printUsage("Invalid argument '" + cmd + "'"); 637 return false; 638 } 639 640 if (i == args.length-2) { 641 if (isPeerQuorumAddress(cmd)) { 642 peerQuorumAddress = cmd; 643 } else { 644 peerId = cmd; 645 } 646 } 647 648 if (i == args.length-1) { 649 tableName = cmd; 650 } 651 } 652 653 if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null) 654 || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) { 655 printUsage("Source snapshot name and snapshot temp location should be provided" 656 + " to use snapshots in source cluster"); 657 return false; 658 } 659 660 if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null 661 || peerHBaseRootAddress != null) { 662 if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null 663 || peerHBaseRootAddress == null) { 664 printUsage( 665 "Peer snapshot name, peer snapshot temp location, Peer HBase root address and " 666 + "peer FSAddress should be provided to use snapshots in peer cluster"); 667 return false; 668 } 669 } 670 671 // This is to avoid making recompare calls to source/peer tables when snapshots are used 672 if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) { 673 printUsage( 674 "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable"); 675 return false; 676 } 677 678 } catch (Exception e) { 679 e.printStackTrace(); 680 printUsage("Can't start because " + e.getMessage()); 681 return false; 682 } 683 return true; 684 } 685 686 private boolean isPeerQuorumAddress(String cmd) { 687 try { 688 ZKConfig.validateClusterKey(cmd); 689 } catch (IOException e) { 690 // not a quorum address 691 return false; 692 } 693 return true; 694 } 695 696 /* 697 * @param errorMsg Error message. Can be null. 698 */ 699 private static void printUsage(final String errorMsg) { 700 if (errorMsg != null && errorMsg.length() > 0) { 701 System.err.println("ERROR: " + errorMsg); 702 } 703 System.err.println("Usage: verifyrep [--starttime=X]" 704 + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " 705 + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] " 706 + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] " 707 + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>"); 708 System.err.println(); 709 System.err.println("Options:"); 710 System.err.println(" starttime beginning of the time range"); 711 System.err.println(" without endtime means from starttime to forever"); 712 System.err.println(" endtime end of the time range"); 713 System.err.println(" versions number of cell versions to verify"); 714 System.err.println(" batch batch count for scan, " + 715 "note that result row counts will no longer be actual number of rows when you use this option"); 716 System.err.println(" raw includes raw scan if given in options"); 717 System.err.println(" families comma-separated list of families to copy"); 718 System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); 719 System.err.println(" delimiter the delimiter used in display around rowkey"); 720 System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + 721 "default value is 0 which disables the recompare."); 722 System.err.println(" verbose logs row keys of good rows"); 723 System.err.println(" peerTableName Peer Table Name"); 724 System.err.println(" sourceSnapshotName Source Snapshot Name"); 725 System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot"); 726 System.err.println(" peerSnapshotName Peer Snapshot Name"); 727 System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot"); 728 System.err.println(" peerFSAddress Peer cluster Hadoop FS address"); 729 System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); 730 System.err.println(); 731 System.err.println("Args:"); 732 System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); 733 System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The " 734 + "format is zk_quorum:zk_port:zk_hbase_path"); 735 System.err.println(" tablename Name of the table to verify"); 736 System.err.println(); 737 System.err.println("Examples:"); 738 System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 "); 739 System.err.println(" $ hbase " + 740 "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" + 741 " --starttime=1265875194289 --endtime=1265878794289 5 TestTable "); 742 } 743 744 @Override 745 public int run(String[] args) throws Exception { 746 Configuration conf = this.getConf(); 747 Job job = createSubmittableJob(conf, args); 748 if (job != null) { 749 return job.waitForCompletion(true) ? 0 : 1; 750 } 751 return 1; 752 } 753 754 /** 755 * Main entry point. 756 * 757 * @param args The command line parameters. 758 * @throws Exception When running the job fails. 759 */ 760 public static void main(String[] args) throws Exception { 761 int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args); 762 System.exit(res); 763 } 764}