001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce.replication; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.UUID; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.conf.Configured; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.ConnectionFactory; 033import org.apache.hadoop.hbase.client.Get; 034import org.apache.hadoop.hbase.client.Put; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.ResultScanner; 037import org.apache.hadoop.hbase.client.Scan; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.client.TableSnapshotScanner; 040import org.apache.hadoop.hbase.filter.Filter; 041import org.apache.hadoop.hbase.filter.FilterList; 042import org.apache.hadoop.hbase.filter.PrefixFilter; 043import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 044import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 045import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 046import org.apache.hadoop.hbase.mapreduce.TableMapper; 047import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; 048import org.apache.hadoop.hbase.mapreduce.TableSplit; 049import org.apache.hadoop.hbase.replication.ReplicationException; 050import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 051import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 052import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 053import org.apache.hadoop.hbase.replication.ReplicationUtils; 054import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.CommonFSUtils; 057import org.apache.hadoop.hbase.util.Pair; 058import org.apache.hadoop.hbase.util.Threads; 059import org.apache.hadoop.hbase.zookeeper.ZKConfig; 060import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 061import org.apache.hadoop.mapreduce.InputSplit; 062import org.apache.hadoop.mapreduce.Job; 063import org.apache.hadoop.mapreduce.MRJobConfig; 064import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 065import org.apache.hadoop.util.Tool; 066import org.apache.hadoop.util.ToolRunner; 067import org.apache.yetus.audience.InterfaceAudience; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071/** 072 * This map-only job compares the data from a local table with a remote one. Every cell is compared 073 * and must have exactly the same keys (even timestamp) as well as same value. It is possible to 074 * restrict the job by time range and families. The peer id that's provided must match the one given 075 * when the replication stream was setup. 076 * <p> 077 * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason for a why a row is 078 * different is shown in the map's log. 079 */ 080@InterfaceAudience.Private 081public class VerifyReplication extends Configured implements Tool { 082 083 private static final Logger LOG = LoggerFactory.getLogger(VerifyReplication.class); 084 085 public final static String NAME = "verifyrep"; 086 private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; 087 long startTime = 0; 088 long endTime = Long.MAX_VALUE; 089 int batch = -1; 090 int versions = -1; 091 String tableName = null; 092 String families = null; 093 String delimiter = ""; 094 String peerId = null; 095 String peerQuorumAddress = null; 096 String rowPrefixes = null; 097 int sleepMsBeforeReCompare = 0; 098 boolean verbose = false; 099 boolean includeDeletedCells = false; 100 // Source table snapshot name 101 String sourceSnapshotName = null; 102 // Temp location in source cluster to restore source snapshot 103 String sourceSnapshotTmpDir = null; 104 // Peer table snapshot name 105 String peerSnapshotName = null; 106 // Temp location in peer cluster to restore peer snapshot 107 String peerSnapshotTmpDir = null; 108 // Peer cluster Hadoop FS address 109 String peerFSAddress = null; 110 // Peer cluster HBase root dir location 111 String peerHBaseRootAddress = null; 112 // Peer Table Name 113 String peerTableName = null; 114 115 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 116 117 /** 118 * Map-only comparator for 2 tables 119 */ 120 public static class Verifier extends TableMapper<ImmutableBytesWritable, Put> { 121 122 public enum Counters { 123 GOODROWS, 124 BADROWS, 125 ONLY_IN_SOURCE_TABLE_ROWS, 126 ONLY_IN_PEER_TABLE_ROWS, 127 CONTENT_DIFFERENT_ROWS 128 } 129 130 private Connection sourceConnection; 131 private Table sourceTable; 132 private Connection replicatedConnection; 133 private Table replicatedTable; 134 private ResultScanner replicatedScanner; 135 private Result currentCompareRowInPeerTable; 136 private int sleepMsBeforeReCompare; 137 private String delimiter = ""; 138 private boolean verbose = false; 139 private int batch = -1; 140 141 /** 142 * Map method that compares every scanned row with the equivalent from a distant cluster. 143 * @param row The current table row key. 144 * @param value The columns. 145 * @param context The current context. 146 * @throws IOException When something is broken with the data. 147 */ 148 @Override 149 public void map(ImmutableBytesWritable row, final Result value, Context context) 150 throws IOException { 151 if (replicatedScanner == null) { 152 Configuration conf = context.getConfiguration(); 153 sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0); 154 delimiter = conf.get(NAME + ".delimiter", ""); 155 verbose = conf.getBoolean(NAME + ".verbose", false); 156 batch = conf.getInt(NAME + ".batch", -1); 157 final Scan scan = new Scan(); 158 if (batch > 0) { 159 scan.setBatch(batch); 160 } 161 scan.setCacheBlocks(false); 162 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); 163 long startTime = conf.getLong(NAME + ".startTime", 0); 164 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); 165 String families = conf.get(NAME + ".families", null); 166 if (families != null) { 167 String[] fams = families.split(","); 168 for (String fam : fams) { 169 scan.addFamily(Bytes.toBytes(fam)); 170 } 171 } 172 boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); 173 scan.setRaw(includeDeletedCells); 174 String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); 175 setRowPrefixFilter(scan, rowPrefixes); 176 scan.setTimeRange(startTime, endTime); 177 int versions = conf.getInt(NAME + ".versions", -1); 178 LOG.info("Setting number of version inside map as: " + versions); 179 if (versions >= 0) { 180 scan.setMaxVersions(versions); 181 } 182 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); 183 sourceConnection = ConnectionFactory.createConnection(conf); 184 sourceTable = sourceConnection.getTable(tableName); 185 186 final InputSplit tableSplit = context.getInputSplit(); 187 188 String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); 189 Configuration peerConf = 190 HBaseConfiguration.createClusterConf(conf, zkClusterKey, PEER_CONFIG_PREFIX); 191 192 String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString()); 193 TableName peerTableName = TableName.valueOf(peerName); 194 replicatedConnection = ConnectionFactory.createConnection(peerConf); 195 replicatedTable = replicatedConnection.getTable(peerTableName); 196 scan.setStartRow(value.getRow()); 197 198 byte[] endRow = null; 199 if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) { 200 endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo() 201 .getEndKey(); 202 } else { 203 endRow = ((TableSplit) tableSplit).getEndRow(); 204 } 205 206 scan.setStopRow(endRow); 207 208 String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null); 209 if (peerSnapshotName != null) { 210 String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null); 211 String peerFSAddress = conf.get(NAME + ".peerFSAddress", null); 212 String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null); 213 FileSystem.setDefaultUri(peerConf, peerFSAddress); 214 CommonFSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress)); 215 LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:" 216 + peerSnapshotTmpDir + " peer root uri:" + CommonFSUtils.getRootDir(peerConf) 217 + " peerFSAddress:" + peerFSAddress); 218 219 replicatedScanner = new TableSnapshotScanner(peerConf, CommonFSUtils.getRootDir(peerConf), 220 new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true); 221 } else { 222 replicatedScanner = replicatedTable.getScanner(scan); 223 } 224 currentCompareRowInPeerTable = replicatedScanner.next(); 225 } 226 while (true) { 227 if (currentCompareRowInPeerTable == null) { 228 // reach the region end of peer table, row only in source table 229 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); 230 break; 231 } 232 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); 233 if (rowCmpRet == 0) { 234 // rowkey is same, need to compare the content of the row 235 try { 236 Result.compareResults(value, currentCompareRowInPeerTable, false); 237 context.getCounter(Counters.GOODROWS).increment(1); 238 if (verbose) { 239 LOG.info( 240 "Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter); 241 } 242 } catch (Exception e) { 243 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); 244 } 245 currentCompareRowInPeerTable = replicatedScanner.next(); 246 break; 247 } else if (rowCmpRet < 0) { 248 // row only exists in source table 249 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); 250 break; 251 } else { 252 // row only exists in peer table 253 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, 254 currentCompareRowInPeerTable); 255 currentCompareRowInPeerTable = replicatedScanner.next(); 256 } 257 } 258 } 259 260 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { 261 if (sleepMsBeforeReCompare > 0) { 262 Threads.sleep(sleepMsBeforeReCompare); 263 try { 264 Result sourceResult = sourceTable.get(new Get(row.getRow())); 265 Result replicatedResult = replicatedTable.get(new Get(row.getRow())); 266 Result.compareResults(sourceResult, replicatedResult, false); 267 if (!sourceResult.isEmpty()) { 268 context.getCounter(Counters.GOODROWS).increment(1); 269 if (verbose) { 270 LOG.info("Good row key (with recompare): " + delimiter 271 + Bytes.toStringBinary(row.getRow()) + delimiter); 272 } 273 } 274 return; 275 } catch (Exception e) { 276 LOG.error("recompare fail after sleep, rowkey=" + delimiter 277 + Bytes.toStringBinary(row.getRow()) + delimiter); 278 } 279 } 280 context.getCounter(counter).increment(1); 281 context.getCounter(Counters.BADROWS).increment(1); 282 LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) 283 + delimiter); 284 } 285 286 @Override 287 protected void cleanup(Context context) { 288 if (replicatedScanner != null) { 289 try { 290 while (currentCompareRowInPeerTable != null) { 291 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, 292 currentCompareRowInPeerTable); 293 currentCompareRowInPeerTable = replicatedScanner.next(); 294 } 295 } catch (Exception e) { 296 LOG.error("fail to scan peer table in cleanup", e); 297 } finally { 298 replicatedScanner.close(); 299 replicatedScanner = null; 300 } 301 } 302 303 if (sourceTable != null) { 304 try { 305 sourceTable.close(); 306 } catch (IOException e) { 307 LOG.error("fail to close source table in cleanup", e); 308 } 309 } 310 if (sourceConnection != null) { 311 try { 312 sourceConnection.close(); 313 } catch (Exception e) { 314 LOG.error("fail to close source connection in cleanup", e); 315 } 316 } 317 318 if (replicatedTable != null) { 319 try { 320 replicatedTable.close(); 321 } catch (Exception e) { 322 LOG.error("fail to close replicated table in cleanup", e); 323 } 324 } 325 if (replicatedConnection != null) { 326 try { 327 replicatedConnection.close(); 328 } catch (Exception e) { 329 LOG.error("fail to close replicated connection in cleanup", e); 330 } 331 } 332 } 333 } 334 335 private static Pair<ReplicationPeerConfig, Configuration> 336 getPeerQuorumConfig(final Configuration conf, String peerId) throws IOException { 337 ZKWatcher localZKW = null; 338 try { 339 localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() { 340 @Override 341 public void abort(String why, Throwable e) { 342 } 343 344 @Override 345 public boolean isAborted() { 346 return false; 347 } 348 }); 349 ReplicationPeerStorage storage = 350 ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf); 351 ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId); 352 return Pair.newPair(peerConfig, 353 ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf)); 354 } catch (ReplicationException e) { 355 throw new IOException("An error occurred while trying to connect to the remote peer cluster", 356 e); 357 } finally { 358 if (localZKW != null) { 359 localZKW.close(); 360 } 361 } 362 } 363 364 private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress) 365 throws IOException { 366 Configuration peerConf = 367 HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); 368 FileSystem.setDefaultUri(peerConf, peerFSAddress); 369 CommonFSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress)); 370 FileSystem fs = FileSystem.get(peerConf); 371 RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, CommonFSUtils.getRootDir(peerConf), 372 new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName); 373 } 374 375 /** 376 * Sets up the actual job. 377 * @param conf The current configuration. 378 * @param args The command line parameters. 379 * @return The newly created job. 380 * @throws java.io.IOException When setting up the job fails. 381 */ 382 public Job createSubmittableJob(Configuration conf, String[] args) throws IOException { 383 if (!doCommandLine(args)) { 384 return null; 385 } 386 conf.set(NAME + ".tableName", tableName); 387 conf.setLong(NAME + ".startTime", startTime); 388 conf.setLong(NAME + ".endTime", endTime); 389 conf.setInt(NAME + ".sleepMsBeforeReCompare", sleepMsBeforeReCompare); 390 conf.set(NAME + ".delimiter", delimiter); 391 conf.setInt(NAME + ".batch", batch); 392 conf.setBoolean(NAME + ".verbose", verbose); 393 conf.setBoolean(NAME + ".includeDeletedCells", includeDeletedCells); 394 if (families != null) { 395 conf.set(NAME + ".families", families); 396 } 397 if (rowPrefixes != null) { 398 conf.set(NAME + ".rowPrefixes", rowPrefixes); 399 } 400 401 String peerQuorumAddress; 402 Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null; 403 if (peerId != null) { 404 peerConfigPair = getPeerQuorumConfig(conf, peerId); 405 ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); 406 peerQuorumAddress = peerConfig.getClusterKey(); 407 LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " 408 + peerConfig.getConfiguration()); 409 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); 410 HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, 411 peerConfig.getConfiguration().entrySet()); 412 } else { 413 assert this.peerQuorumAddress != null; 414 peerQuorumAddress = this.peerQuorumAddress; 415 LOG.info("Peer Quorum Address: " + peerQuorumAddress); 416 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); 417 } 418 419 if (peerTableName != null) { 420 LOG.info("Peer Table Name: " + peerTableName); 421 conf.set(NAME + ".peerTableName", peerTableName); 422 } 423 424 conf.setInt(NAME + ".versions", versions); 425 LOG.info("Number of version: " + versions); 426 427 // Set Snapshot specific parameters 428 if (peerSnapshotName != null) { 429 conf.set(NAME + ".peerSnapshotName", peerSnapshotName); 430 431 // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to 432 // restore snapshot. 433 Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString()); 434 peerSnapshotTmpDir = restoreDir.toString(); 435 conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir); 436 437 conf.set(NAME + ".peerFSAddress", peerFSAddress); 438 conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress); 439 440 // This is to create HDFS delegation token for peer cluster in case of secured 441 conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress, conf.get(HConstants.HBASE_DIR)); 442 } 443 444 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 445 job.setJarByClass(VerifyReplication.class); 446 447 Scan scan = new Scan(); 448 scan.setTimeRange(startTime, endTime); 449 scan.setRaw(includeDeletedCells); 450 scan.setCacheBlocks(false); 451 if (batch > 0) { 452 scan.setBatch(batch); 453 } 454 if (versions >= 0) { 455 scan.setMaxVersions(versions); 456 LOG.info("Number of versions set to " + versions); 457 } 458 if (families != null) { 459 String[] fams = families.split(","); 460 for (String fam : fams) { 461 scan.addFamily(Bytes.toBytes(fam)); 462 } 463 } 464 465 setRowPrefixFilter(scan, rowPrefixes); 466 467 if (sourceSnapshotName != null) { 468 Path snapshotTempPath = new Path(sourceSnapshotTmpDir); 469 LOG.info( 470 "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir); 471 TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null, 472 null, job, true, snapshotTempPath); 473 restoreSnapshotForPeerCluster(conf, peerQuorumAddress); 474 } else { 475 TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); 476 } 477 478 Configuration peerClusterConf; 479 if (peerId != null) { 480 assert peerConfigPair != null; 481 peerClusterConf = peerConfigPair.getSecond(); 482 } else { 483 peerClusterConf = 484 HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); 485 } 486 // Obtain the auth token from peer cluster 487 TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); 488 489 job.setOutputFormatClass(NullOutputFormat.class); 490 job.setNumReduceTasks(0); 491 return job; 492 } 493 494 private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { 495 if (rowPrefixes != null && !rowPrefixes.isEmpty()) { 496 String[] rowPrefixArray = rowPrefixes.split(","); 497 Arrays.sort(rowPrefixArray); 498 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); 499 for (String prefix : rowPrefixArray) { 500 Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); 501 filterList.addFilter(filter); 502 } 503 scan.setFilter(filterList); 504 byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]); 505 byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length - 1]); 506 setStartAndStopRows(scan, startPrefixRow, lastPrefixRow); 507 } 508 } 509 510 private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) { 511 scan.setStartRow(startPrefixRow); 512 byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), 513 new byte[] { (byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1) }); 514 scan.setStopRow(stopRow); 515 } 516 517 public boolean doCommandLine(final String[] args) { 518 if (args.length < 2) { 519 printUsage(null); 520 return false; 521 } 522 try { 523 for (int i = 0; i < args.length; i++) { 524 String cmd = args[i]; 525 if (cmd.equals("-h") || cmd.startsWith("--h")) { 526 printUsage(null); 527 return false; 528 } 529 530 final String startTimeArgKey = "--starttime="; 531 if (cmd.startsWith(startTimeArgKey)) { 532 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 533 continue; 534 } 535 536 final String endTimeArgKey = "--endtime="; 537 if (cmd.startsWith(endTimeArgKey)) { 538 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 539 continue; 540 } 541 542 final String includeDeletedCellsArgKey = "--raw"; 543 if (cmd.equals(includeDeletedCellsArgKey)) { 544 includeDeletedCells = true; 545 continue; 546 } 547 548 final String versionsArgKey = "--versions="; 549 if (cmd.startsWith(versionsArgKey)) { 550 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 551 continue; 552 } 553 554 final String batchArgKey = "--batch="; 555 if (cmd.startsWith(batchArgKey)) { 556 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 557 continue; 558 } 559 560 final String familiesArgKey = "--families="; 561 if (cmd.startsWith(familiesArgKey)) { 562 families = cmd.substring(familiesArgKey.length()); 563 continue; 564 } 565 566 final String rowPrefixesKey = "--row-prefixes="; 567 if (cmd.startsWith(rowPrefixesKey)) { 568 rowPrefixes = cmd.substring(rowPrefixesKey.length()); 569 continue; 570 } 571 572 final String delimiterArgKey = "--delimiter="; 573 if (cmd.startsWith(delimiterArgKey)) { 574 delimiter = cmd.substring(delimiterArgKey.length()); 575 continue; 576 } 577 578 final String sleepToReCompareKey = "--recomparesleep="; 579 if (cmd.startsWith(sleepToReCompareKey)) { 580 sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); 581 continue; 582 } 583 final String verboseKey = "--verbose"; 584 if (cmd.startsWith(verboseKey)) { 585 verbose = true; 586 continue; 587 } 588 589 final String sourceSnapshotNameArgKey = "--sourceSnapshotName="; 590 if (cmd.startsWith(sourceSnapshotNameArgKey)) { 591 sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length()); 592 continue; 593 } 594 595 final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir="; 596 if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) { 597 sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length()); 598 continue; 599 } 600 601 final String peerSnapshotNameArgKey = "--peerSnapshotName="; 602 if (cmd.startsWith(peerSnapshotNameArgKey)) { 603 peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length()); 604 continue; 605 } 606 607 final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir="; 608 if (cmd.startsWith(peerSnapshotTmpDirArgKey)) { 609 peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length()); 610 continue; 611 } 612 613 final String peerFSAddressArgKey = "--peerFSAddress="; 614 if (cmd.startsWith(peerFSAddressArgKey)) { 615 peerFSAddress = cmd.substring(peerFSAddressArgKey.length()); 616 continue; 617 } 618 619 final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress="; 620 if (cmd.startsWith(peerHBaseRootAddressArgKey)) { 621 peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length()); 622 continue; 623 } 624 625 final String peerTableNameArgKey = "--peerTableName="; 626 if (cmd.startsWith(peerTableNameArgKey)) { 627 peerTableName = cmd.substring(peerTableNameArgKey.length()); 628 continue; 629 } 630 631 if (cmd.startsWith("--")) { 632 printUsage("Invalid argument '" + cmd + "'"); 633 return false; 634 } 635 636 if (i == args.length - 2) { 637 if (isPeerQuorumAddress(cmd)) { 638 peerQuorumAddress = cmd; 639 } else { 640 peerId = cmd; 641 } 642 } 643 644 if (i == args.length - 1) { 645 tableName = cmd; 646 } 647 } 648 649 if ( 650 (sourceSnapshotName != null && sourceSnapshotTmpDir == null) 651 || (sourceSnapshotName == null && sourceSnapshotTmpDir != null) 652 ) { 653 printUsage("Source snapshot name and snapshot temp location should be provided" 654 + " to use snapshots in source cluster"); 655 return false; 656 } 657 658 if ( 659 peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null 660 || peerHBaseRootAddress != null 661 ) { 662 if ( 663 peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null 664 || peerHBaseRootAddress == null 665 ) { 666 printUsage( 667 "Peer snapshot name, peer snapshot temp location, Peer HBase root address and " 668 + "peer FSAddress should be provided to use snapshots in peer cluster"); 669 return false; 670 } 671 } 672 673 // This is to avoid making recompare calls to source/peer tables when snapshots are used 674 if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) { 675 printUsage( 676 "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are" 677 + " immutable"); 678 return false; 679 } 680 681 } catch (Exception e) { 682 e.printStackTrace(); 683 printUsage("Can't start because " + e.getMessage()); 684 return false; 685 } 686 return true; 687 } 688 689 private boolean isPeerQuorumAddress(String cmd) { 690 try { 691 ZKConfig.validateClusterKey(cmd); 692 } catch (IOException e) { 693 // not a quorum address 694 return false; 695 } 696 return true; 697 } 698 699 /* 700 * @param errorMsg Error message. Can be null. 701 */ 702 private static void printUsage(final String errorMsg) { 703 if (errorMsg != null && errorMsg.length() > 0) { 704 System.err.println("ERROR: " + errorMsg); 705 } 706 System.err.println("Usage: verifyrep [--starttime=X]" 707 + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " 708 + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] " 709 + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] " 710 + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>"); 711 System.err.println(); 712 System.err.println("Options:"); 713 System.err.println(" starttime beginning of the time range"); 714 System.err.println(" without endtime means from starttime to forever"); 715 System.err.println(" endtime end of the time range"); 716 System.err.println(" versions number of cell versions to verify"); 717 System.err.println(" batch batch count for scan, note that" 718 + " result row counts will no longer be actual number of rows when you use this option"); 719 System.err.println(" raw includes raw scan if given in options"); 720 System.err.println(" families comma-separated list of families to copy"); 721 System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); 722 System.err.println(" delimiter the delimiter used in display around rowkey"); 723 System.err.println(" recomparesleep milliseconds to sleep before recompare row, " 724 + "default value is 0 which disables the recompare."); 725 System.err.println(" verbose logs row keys of good rows"); 726 System.err.println(" peerTableName Peer Table Name"); 727 System.err.println(" sourceSnapshotName Source Snapshot Name"); 728 System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot"); 729 System.err.println(" peerSnapshotName Peer Snapshot Name"); 730 System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot"); 731 System.err.println(" peerFSAddress Peer cluster Hadoop FS address"); 732 System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); 733 System.err.println(); 734 System.err.println("Args:"); 735 System.err.println(" peerid Id of the peer used for verification," 736 + " must match the one given for replication"); 737 System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The " 738 + "format is zk_quorum:zk_port:zk_hbase_path"); 739 System.err.println(" tablename Name of the table to verify"); 740 System.err.println(); 741 System.err.println("Examples:"); 742 System.err 743 .println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 "); 744 System.err 745 .println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" 746 + " --starttime=1265875194289 --endtime=1265878794289 5 TestTable "); 747 System.err.println(); 748 System.err.println( 749 " To verify the data in TestTable between the cluster runs VerifyReplication and cluster-b"); 750 System.err.println(" Assume quorum address for cluster-b is" 751 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b"); 752 System.err 753 .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" 754 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 755 + "2181:/cluster-b \\\n" + " TestTable"); 756 System.err.println(); 757 System.err 758 .println(" To verify the data in TestTable between the secured cluster runs VerifyReplication" 759 + " and insecure cluster-b"); 760 System.err 761 .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" 762 + " -D verifyrep.peer.hbase.security.authentication=simple \\\n" 763 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 764 + "2181:/cluster-b \\\n" + " TestTable"); 765 System.err.println(); 766 System.err.println(" To verify the data in TestTable between" 767 + " the secured cluster runs VerifyReplication and secured cluster-b"); 768 System.err.println(" Assume cluster-b uses different kerberos principal, cluster-b/_HOST@E" 769 + ", for master and regionserver kerberos principal from another cluster"); 770 System.err 771 .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" 772 + " -D verifyrep.peer.hbase.regionserver.kerberos.principal=" 773 + "cluster-b/_HOST@EXAMPLE.COM \\\n" 774 + " -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n" 775 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 776 + "2181:/cluster-b \\\n" + " TestTable"); 777 System.err.println(); 778 System.err.println( 779 " To verify the data in TestTable between the insecure cluster runs VerifyReplication" 780 + " and secured cluster-b"); 781 System.err 782 .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" 783 + " -D verifyrep.peer.hbase.security.authentication=kerberos \\\n" 784 + " -D verifyrep.peer.hbase.regionserver.kerberos.principal=" 785 + "cluster-b/_HOST@EXAMPLE.COM \\\n" 786 + " -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n" 787 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 788 + "2181:/cluster-b \\\n" + " TestTable"); 789 } 790 791 @Override 792 public int run(String[] args) throws Exception { 793 Configuration conf = this.getConf(); 794 Job job = createSubmittableJob(conf, args); 795 if (job != null) { 796 return job.waitForCompletion(true) ? 0 : 1; 797 } 798 return 1; 799 } 800 801 /** 802 * Main entry point. 803 * @param args The command line parameters. 804 * @throws Exception When running the job fails. 805 */ 806 public static void main(String[] args) throws Exception { 807 int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args); 808 System.exit(res); 809 } 810}