001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce.replication; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.UUID; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.conf.Configured; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Abortable; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.Get; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.ResultScanner; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.client.TableSnapshotScanner; 041import org.apache.hadoop.hbase.filter.Filter; 042import org.apache.hadoop.hbase.filter.FilterList; 043import org.apache.hadoop.hbase.filter.PrefixFilter; 044import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 045import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 046import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 047import org.apache.hadoop.hbase.mapreduce.TableMapper; 048import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; 049import org.apache.hadoop.hbase.mapreduce.TableSplit; 050import org.apache.hadoop.hbase.replication.ReplicationException; 051import org.apache.hadoop.hbase.replication.ReplicationFactory; 052import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 053import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; 054import org.apache.hadoop.hbase.replication.ReplicationPeers; 055import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.FSUtils; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.hbase.util.Threads; 060import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 061import org.apache.hadoop.mapreduce.InputSplit; 062import org.apache.hadoop.mapreduce.Job; 063import org.apache.hadoop.mapreduce.MRJobConfig; 064import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 065import org.apache.hadoop.util.Tool; 066import org.apache.hadoop.util.ToolRunner; 067import org.apache.yetus.audience.InterfaceAudience; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 072 073/** 074 * This map-only job compares the data from a local table with a remote one. 075 * Every cell is compared and must have exactly the same keys (even timestamp) 076 * as well as same value. It is possible to restrict the job by time range and 077 * families. The peer id that's provided must match the one given when the 078 * replication stream was setup. 079 * <p> 080 * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason 081 * for a why a row is different is shown in the map's log. 082 */ 083@InterfaceAudience.Private 084public class VerifyReplication extends Configured implements Tool { 085 086 private static final Logger LOG = 087 LoggerFactory.getLogger(VerifyReplication.class); 088 089 public final static String NAME = "verifyrep"; 090 private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; 091 long startTime = 0; 092 long endTime = Long.MAX_VALUE; 093 int batch = -1; 094 int versions = -1; 095 String tableName = null; 096 String families = null; 097 String delimiter = ""; 098 String peerId = null; 099 String rowPrefixes = null; 100 int sleepMsBeforeReCompare = 0; 101 boolean verbose = false; 102 boolean includeDeletedCells = false; 103 //Source table snapshot name 104 String sourceSnapshotName = null; 105 //Temp location in source cluster to restore source snapshot 106 String sourceSnapshotTmpDir = null; 107 //Peer table snapshot name 108 String peerSnapshotName = null; 109 //Temp location in peer cluster to restore peer snapshot 110 String peerSnapshotTmpDir = null; 111 //Peer cluster Hadoop FS address 112 String peerFSAddress = null; 113 //Peer cluster HBase root dir location 114 String peerHBaseRootAddress = null; 115 116 117 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 118 119 /** 120 * Map-only comparator for 2 tables 121 */ 122 public static class Verifier 123 extends TableMapper<ImmutableBytesWritable, Put> { 124 125 public enum Counters { 126 GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS 127 } 128 129 private Connection sourceConnection; 130 private Table sourceTable; 131 private Connection replicatedConnection; 132 private Table replicatedTable; 133 private ResultScanner replicatedScanner; 134 private Result currentCompareRowInPeerTable; 135 private int sleepMsBeforeReCompare; 136 private String delimiter = ""; 137 private boolean verbose = false; 138 private int batch = -1; 139 140 /** 141 * Map method that compares every scanned row with the equivalent from 142 * a distant cluster. 143 * @param row The current table row key. 144 * @param value The columns. 145 * @param context The current context. 146 * @throws IOException When something is broken with the data. 147 */ 148 @Override 149 public void map(ImmutableBytesWritable row, final Result value, 150 Context context) 151 throws IOException { 152 if (replicatedScanner == null) { 153 Configuration conf = context.getConfiguration(); 154 sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0); 155 delimiter = conf.get(NAME + ".delimiter", ""); 156 verbose = conf.getBoolean(NAME +".verbose", false); 157 batch = conf.getInt(NAME + ".batch", -1); 158 final Scan scan = new Scan(); 159 if (batch > 0) { 160 scan.setBatch(batch); 161 } 162 scan.setCacheBlocks(false); 163 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); 164 long startTime = conf.getLong(NAME + ".startTime", 0); 165 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); 166 String families = conf.get(NAME + ".families", null); 167 if(families != null) { 168 String[] fams = families.split(","); 169 for(String fam : fams) { 170 scan.addFamily(Bytes.toBytes(fam)); 171 } 172 } 173 boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); 174 scan.setRaw(includeDeletedCells); 175 String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); 176 setRowPrefixFilter(scan, rowPrefixes); 177 scan.setTimeRange(startTime, endTime); 178 int versions = conf.getInt(NAME+".versions", -1); 179 LOG.info("Setting number of version inside map as: " + versions); 180 if (versions >= 0) { 181 scan.setMaxVersions(versions); 182 } 183 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); 184 sourceConnection = ConnectionFactory.createConnection(conf); 185 sourceTable = sourceConnection.getTable(tableName); 186 187 final InputSplit tableSplit = context.getInputSplit(); 188 189 String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); 190 Configuration peerConf = HBaseConfiguration.createClusterConf(conf, 191 zkClusterKey, PEER_CONFIG_PREFIX); 192 193 replicatedConnection = ConnectionFactory.createConnection(peerConf); 194 replicatedTable = replicatedConnection.getTable(tableName); 195 scan.setStartRow(value.getRow()); 196 197 byte[] endRow = null; 198 if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) { 199 endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo() 200 .getEndKey(); 201 } else { 202 endRow = ((TableSplit) tableSplit).getEndRow(); 203 } 204 205 scan.setStopRow(endRow); 206 207 String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null); 208 if (peerSnapshotName != null) { 209 String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null); 210 String peerFSAddress = conf.get(NAME + ".peerFSAddress", null); 211 String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null); 212 FileSystem.setDefaultUri(peerConf, peerFSAddress); 213 FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress)); 214 LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:" 215 + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf) 216 + " peerFSAddress:" + peerFSAddress); 217 218 replicatedScanner = new TableSnapshotScanner(peerConf, FSUtils.getRootDir(peerConf), 219 new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true); 220 } else { 221 replicatedScanner = replicatedTable.getScanner(scan); 222 } 223 currentCompareRowInPeerTable = replicatedScanner.next(); 224 } 225 while (true) { 226 if (currentCompareRowInPeerTable == null) { 227 // reach the region end of peer table, row only in source table 228 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); 229 break; 230 } 231 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); 232 if (rowCmpRet == 0) { 233 // rowkey is same, need to compare the content of the row 234 try { 235 Result.compareResults(value, currentCompareRowInPeerTable); 236 context.getCounter(Counters.GOODROWS).increment(1); 237 if (verbose) { 238 LOG.info("Good row key: " + delimiter 239 + Bytes.toStringBinary(value.getRow()) + delimiter); 240 } 241 } catch (Exception e) { 242 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); 243 } 244 currentCompareRowInPeerTable = replicatedScanner.next(); 245 break; 246 } else if (rowCmpRet < 0) { 247 // row only exists in source table 248 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); 249 break; 250 } else { 251 // row only exists in peer table 252 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, 253 currentCompareRowInPeerTable); 254 currentCompareRowInPeerTable = replicatedScanner.next(); 255 } 256 } 257 } 258 259 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { 260 if (sleepMsBeforeReCompare > 0) { 261 Threads.sleep(sleepMsBeforeReCompare); 262 try { 263 Result sourceResult = sourceTable.get(new Get(row.getRow())); 264 Result replicatedResult = replicatedTable.get(new Get(row.getRow())); 265 Result.compareResults(sourceResult, replicatedResult); 266 if (!sourceResult.isEmpty()) { 267 context.getCounter(Counters.GOODROWS).increment(1); 268 if (verbose) { 269 LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow()) 270 + delimiter); 271 } 272 } 273 return; 274 } catch (Exception e) { 275 LOG.error("recompare fail after sleep, rowkey=" + delimiter + 276 Bytes.toStringBinary(row.getRow()) + delimiter); 277 } 278 } 279 context.getCounter(counter).increment(1); 280 context.getCounter(Counters.BADROWS).increment(1); 281 LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) + 282 delimiter); 283 } 284 285 @Override 286 protected void cleanup(Context context) { 287 if (replicatedScanner != null) { 288 try { 289 while (currentCompareRowInPeerTable != null) { 290 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, 291 currentCompareRowInPeerTable); 292 currentCompareRowInPeerTable = replicatedScanner.next(); 293 } 294 } catch (Exception e) { 295 LOG.error("fail to scan peer table in cleanup", e); 296 } finally { 297 replicatedScanner.close(); 298 replicatedScanner = null; 299 } 300 } 301 302 if (sourceTable != null) { 303 try { 304 sourceTable.close(); 305 } catch (IOException e) { 306 LOG.error("fail to close source table in cleanup", e); 307 } 308 } 309 if(sourceConnection != null){ 310 try { 311 sourceConnection.close(); 312 } catch (Exception e) { 313 LOG.error("fail to close source connection in cleanup", e); 314 } 315 } 316 317 if(replicatedTable != null){ 318 try{ 319 replicatedTable.close(); 320 } catch (Exception e) { 321 LOG.error("fail to close replicated table in cleanup", e); 322 } 323 } 324 if(replicatedConnection != null){ 325 try { 326 replicatedConnection.close(); 327 } catch (Exception e) { 328 LOG.error("fail to close replicated connection in cleanup", e); 329 } 330 } 331 } 332 } 333 334 private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig( 335 final Configuration conf, String peerId) throws IOException { 336 ZKWatcher localZKW = null; 337 ReplicationPeerZKImpl peer = null; 338 try { 339 localZKW = new ZKWatcher(conf, "VerifyReplication", 340 new Abortable() { 341 @Override public void abort(String why, Throwable e) {} 342 @Override public boolean isAborted() {return false;} 343 }); 344 345 ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); 346 rp.init(); 347 348 Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId); 349 if (pair == null) { 350 throw new IOException("Couldn't get peer conf!"); 351 } 352 353 return pair; 354 } catch (ReplicationException e) { 355 throw new IOException( 356 "An error occurred while trying to connect to the remove peer cluster", e); 357 } finally { 358 if (peer != null) { 359 peer.close(); 360 } 361 if (localZKW != null) { 362 localZKW.close(); 363 } 364 } 365 } 366 367 private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress) 368 throws IOException { 369 Configuration peerConf = 370 HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); 371 FileSystem.setDefaultUri(peerConf, peerFSAddress); 372 FSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress)); 373 FileSystem fs = FileSystem.get(peerConf); 374 RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, FSUtils.getRootDir(peerConf), 375 new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName); 376 } 377 378 /** 379 * Sets up the actual job. 380 * 381 * @param conf The current configuration. 382 * @param args The command line parameters. 383 * @return The newly created job. 384 * @throws java.io.IOException When setting up the job fails. 385 */ 386 public Job createSubmittableJob(Configuration conf, String[] args) 387 throws IOException { 388 if (!doCommandLine(args)) { 389 return null; 390 } 391 conf.set(NAME+".peerId", peerId); 392 conf.set(NAME+".tableName", tableName); 393 conf.setLong(NAME+".startTime", startTime); 394 conf.setLong(NAME+".endTime", endTime); 395 conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare); 396 conf.set(NAME + ".delimiter", delimiter); 397 conf.setInt(NAME + ".batch", batch); 398 conf.setBoolean(NAME +".verbose", verbose); 399 conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells); 400 if (families != null) { 401 conf.set(NAME+".families", families); 402 } 403 if (rowPrefixes != null){ 404 conf.set(NAME+".rowPrefixes", rowPrefixes); 405 } 406 407 Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId); 408 ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); 409 String peerQuorumAddress = peerConfig.getClusterKey(); 410 LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + 411 peerConfig.getConfiguration()); 412 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); 413 HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, 414 peerConfig.getConfiguration().entrySet()); 415 416 conf.setInt(NAME + ".versions", versions); 417 LOG.info("Number of version: " + versions); 418 419 //Set Snapshot specific parameters 420 if (peerSnapshotName != null) { 421 conf.set(NAME + ".peerSnapshotName", peerSnapshotName); 422 423 // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to 424 // restore snapshot. 425 Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString()); 426 peerSnapshotTmpDir = restoreDir.toString(); 427 conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir); 428 429 conf.set(NAME + ".peerFSAddress", peerFSAddress); 430 conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress); 431 432 // This is to create HDFS delegation token for peer cluster in case of secured 433 conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress, conf.get(HConstants.HBASE_DIR)); 434 } 435 436 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 437 job.setJarByClass(VerifyReplication.class); 438 439 Scan scan = new Scan(); 440 scan.setTimeRange(startTime, endTime); 441 scan.setRaw(includeDeletedCells); 442 scan.setCacheBlocks(false); 443 if (batch > 0) { 444 scan.setBatch(batch); 445 } 446 if (versions >= 0) { 447 scan.setMaxVersions(versions); 448 LOG.info("Number of versions set to " + versions); 449 } 450 if(families != null) { 451 String[] fams = families.split(","); 452 for(String fam : fams) { 453 scan.addFamily(Bytes.toBytes(fam)); 454 } 455 } 456 457 setRowPrefixFilter(scan, rowPrefixes); 458 459 if (sourceSnapshotName != null) { 460 Path snapshotTempPath = new Path(sourceSnapshotTmpDir); 461 LOG.info( 462 "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir); 463 TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null, 464 null, job, true, snapshotTempPath); 465 restoreSnapshotForPeerCluster(conf, peerQuorumAddress); 466 } else { 467 TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); 468 } 469 Configuration peerClusterConf = peerConfigPair.getSecond(); 470 // Obtain the auth token from peer cluster 471 TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); 472 473 job.setOutputFormatClass(NullOutputFormat.class); 474 job.setNumReduceTasks(0); 475 return job; 476 } 477 478 private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { 479 if (rowPrefixes != null && !rowPrefixes.isEmpty()) { 480 String[] rowPrefixArray = rowPrefixes.split(","); 481 Arrays.sort(rowPrefixArray); 482 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); 483 for (String prefix : rowPrefixArray) { 484 Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); 485 filterList.addFilter(filter); 486 } 487 scan.setFilter(filterList); 488 byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]); 489 byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]); 490 setStartAndStopRows(scan, startPrefixRow, lastPrefixRow); 491 } 492 } 493 494 private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) { 495 scan.setStartRow(startPrefixRow); 496 byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), 497 new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)}); 498 scan.setStopRow(stopRow); 499 } 500 501 @VisibleForTesting 502 public boolean doCommandLine(final String[] args) { 503 if (args.length < 2) { 504 printUsage(null); 505 return false; 506 } 507 try { 508 for (int i = 0; i < args.length; i++) { 509 String cmd = args[i]; 510 if (cmd.equals("-h") || cmd.startsWith("--h")) { 511 printUsage(null); 512 return false; 513 } 514 515 final String startTimeArgKey = "--starttime="; 516 if (cmd.startsWith(startTimeArgKey)) { 517 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 518 continue; 519 } 520 521 final String endTimeArgKey = "--endtime="; 522 if (cmd.startsWith(endTimeArgKey)) { 523 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 524 continue; 525 } 526 527 final String includeDeletedCellsArgKey = "--raw"; 528 if (cmd.equals(includeDeletedCellsArgKey)) { 529 includeDeletedCells = true; 530 continue; 531 } 532 533 final String versionsArgKey = "--versions="; 534 if (cmd.startsWith(versionsArgKey)) { 535 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 536 continue; 537 } 538 539 final String batchArgKey = "--batch="; 540 if (cmd.startsWith(batchArgKey)) { 541 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 542 continue; 543 } 544 545 final String familiesArgKey = "--families="; 546 if (cmd.startsWith(familiesArgKey)) { 547 families = cmd.substring(familiesArgKey.length()); 548 continue; 549 } 550 551 final String rowPrefixesKey = "--row-prefixes="; 552 if (cmd.startsWith(rowPrefixesKey)){ 553 rowPrefixes = cmd.substring(rowPrefixesKey.length()); 554 continue; 555 } 556 557 final String delimiterArgKey = "--delimiter="; 558 if (cmd.startsWith(delimiterArgKey)) { 559 delimiter = cmd.substring(delimiterArgKey.length()); 560 continue; 561 } 562 563 final String sleepToReCompareKey = "--recomparesleep="; 564 if (cmd.startsWith(sleepToReCompareKey)) { 565 sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); 566 continue; 567 } 568 final String verboseKey = "--verbose"; 569 if (cmd.startsWith(verboseKey)) { 570 verbose = true; 571 continue; 572 } 573 574 final String sourceSnapshotNameArgKey = "--sourceSnapshotName="; 575 if (cmd.startsWith(sourceSnapshotNameArgKey)) { 576 sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length()); 577 continue; 578 } 579 580 final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir="; 581 if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) { 582 sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length()); 583 continue; 584 } 585 586 final String peerSnapshotNameArgKey = "--peerSnapshotName="; 587 if (cmd.startsWith(peerSnapshotNameArgKey)) { 588 peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length()); 589 continue; 590 } 591 592 final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir="; 593 if (cmd.startsWith(peerSnapshotTmpDirArgKey)) { 594 peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length()); 595 continue; 596 } 597 598 final String peerFSAddressArgKey = "--peerFSAddress="; 599 if (cmd.startsWith(peerFSAddressArgKey)) { 600 peerFSAddress = cmd.substring(peerFSAddressArgKey.length()); 601 continue; 602 } 603 604 final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress="; 605 if (cmd.startsWith(peerHBaseRootAddressArgKey)) { 606 peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length()); 607 continue; 608 } 609 610 if (cmd.startsWith("--")) { 611 printUsage("Invalid argument '" + cmd + "'"); 612 return false; 613 } 614 615 if (i == args.length-2) { 616 peerId = cmd; 617 } 618 619 if (i == args.length-1) { 620 tableName = cmd; 621 } 622 } 623 624 if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null) 625 || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) { 626 printUsage("Source snapshot name and snapshot temp location should be provided" 627 + " to use snapshots in source cluster"); 628 return false; 629 } 630 631 if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null 632 || peerHBaseRootAddress != null) { 633 if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null 634 || peerHBaseRootAddress == null) { 635 printUsage( 636 "Peer snapshot name, peer snapshot temp location, Peer HBase root address and " 637 + "peer FSAddress should be provided to use snapshots in peer cluster"); 638 return false; 639 } 640 } 641 642 // This is to avoid making recompare calls to source/peer tables when snapshots are used 643 if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) { 644 printUsage( 645 "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable"); 646 return false; 647 } 648 649 } catch (Exception e) { 650 e.printStackTrace(); 651 printUsage("Can't start because " + e.getMessage()); 652 return false; 653 } 654 return true; 655 } 656 657 /* 658 * @param errorMsg Error message. Can be null. 659 */ 660 private static void printUsage(final String errorMsg) { 661 if (errorMsg != null && errorMsg.length() > 0) { 662 System.err.println("ERROR: " + errorMsg); 663 } 664 System.err.println("Usage: verifyrep [--starttime=X]" + 665 " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " + 666 "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] " 667 + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid> <tablename>"); 668 System.err.println(); 669 System.err.println("Options:"); 670 System.err.println(" starttime beginning of the time range"); 671 System.err.println(" without endtime means from starttime to forever"); 672 System.err.println(" endtime end of the time range"); 673 System.err.println(" versions number of cell versions to verify"); 674 System.err.println(" batch batch count for scan, " + 675 "note that result row counts will no longer be actual number of rows when you use this option"); 676 System.err.println(" raw includes raw scan if given in options"); 677 System.err.println(" families comma-separated list of families to copy"); 678 System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); 679 System.err.println(" delimiter the delimiter used in display around rowkey"); 680 System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + 681 "default value is 0 which disables the recompare."); 682 System.err.println(" verbose logs row keys of good rows"); 683 System.err.println(" sourceSnapshotName Source Snapshot Name"); 684 System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot"); 685 System.err.println(" peerSnapshotName Peer Snapshot Name"); 686 System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot"); 687 System.err.println(" peerFSAddress Peer cluster Hadoop FS address"); 688 System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); 689 System.err.println(); 690 System.err.println("Args:"); 691 System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); 692 System.err.println(" tablename Name of the table to verify"); 693 System.err.println(); 694 System.err.println("Examples:"); 695 System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 "); 696 System.err.println(" $ hbase " + 697 "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" + 698 " --starttime=1265875194289 --endtime=1265878794289 5 TestTable "); 699 } 700 701 @Override 702 public int run(String[] args) throws Exception { 703 Configuration conf = this.getConf(); 704 Job job = createSubmittableJob(conf, args); 705 if (job != null) { 706 return job.waitForCompletion(true) ? 0 : 1; 707 } 708 return 1; 709 } 710 711 /** 712 * Main entry point. 713 * 714 * @param args The command line parameters. 715 * @throws Exception When running the job fails. 716 */ 717 public static void main(String[] args) throws Exception { 718 int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args); 719 System.exit(res); 720 } 721}