001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce.replication; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.UUID; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.conf.Configured; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Abortable; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.Get; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.ResultScanner; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.client.TableSnapshotScanner; 041import org.apache.hadoop.hbase.filter.Filter; 042import org.apache.hadoop.hbase.filter.FilterList; 043import org.apache.hadoop.hbase.filter.PrefixFilter; 044import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 045import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 046import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 047import org.apache.hadoop.hbase.mapreduce.TableMapper; 048import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; 049import org.apache.hadoop.hbase.mapreduce.TableSplit; 050import org.apache.hadoop.hbase.replication.ReplicationException; 051import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 052import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 053import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 054import org.apache.hadoop.hbase.replication.ReplicationUtils; 055import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.CommonFSUtils; 058import org.apache.hadoop.hbase.util.Pair; 059import org.apache.hadoop.hbase.util.Threads; 060import org.apache.hadoop.hbase.zookeeper.ZKConfig; 061import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 062import org.apache.hadoop.mapreduce.InputSplit; 063import org.apache.hadoop.mapreduce.Job; 064import org.apache.hadoop.mapreduce.MRJobConfig; 065import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 066import org.apache.hadoop.util.Tool; 067import org.apache.hadoop.util.ToolRunner; 068import org.apache.yetus.audience.InterfaceAudience; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072/** 073 * This map-only job compares the data from a local table with a remote one. 074 * Every cell is compared and must have exactly the same keys (even timestamp) 075 * as well as same value. It is possible to restrict the job by time range and 076 * families. The peer id that's provided must match the one given when the 077 * replication stream was setup. 078 * <p> 079 * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason 080 * for a why a row is different is shown in the map's log. 081 */ 082@InterfaceAudience.Private 083public class VerifyReplication extends Configured implements Tool { 084 085 private static final Logger LOG = 086 LoggerFactory.getLogger(VerifyReplication.class); 087 088 public final static String NAME = "verifyrep"; 089 private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; 090 long startTime = 0; 091 long endTime = Long.MAX_VALUE; 092 int batch = -1; 093 int versions = -1; 094 String tableName = null; 095 String families = null; 096 String delimiter = ""; 097 String peerId = null; 098 String peerQuorumAddress = null; 099 String rowPrefixes = null; 100 int sleepMsBeforeReCompare = 0; 101 boolean verbose = false; 102 boolean includeDeletedCells = false; 103 //Source table snapshot name 104 String sourceSnapshotName = null; 105 //Temp location in source cluster to restore source snapshot 106 String sourceSnapshotTmpDir = null; 107 //Peer table snapshot name 108 String peerSnapshotName = null; 109 //Temp location in peer cluster to restore peer snapshot 110 String peerSnapshotTmpDir = null; 111 //Peer cluster Hadoop FS address 112 String peerFSAddress = null; 113 //Peer cluster HBase root dir location 114 String peerHBaseRootAddress = null; 115 //Peer Table Name 116 String peerTableName = null; 117 118 119 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 120 121 /** 122 * Map-only comparator for 2 tables 123 */ 124 public static class Verifier 125 extends TableMapper<ImmutableBytesWritable, Put> { 126 127 public enum Counters { 128 GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS 129 } 130 131 private Connection sourceConnection; 132 private Table sourceTable; 133 private Connection replicatedConnection; 134 private Table replicatedTable; 135 private ResultScanner replicatedScanner; 136 private Result currentCompareRowInPeerTable; 137 private int sleepMsBeforeReCompare; 138 private String delimiter = ""; 139 private boolean verbose = false; 140 private int batch = -1; 141 142 /** 143 * Map method that compares every scanned row with the equivalent from 144 * a distant cluster. 145 * @param row The current table row key. 146 * @param value The columns. 147 * @param context The current context. 148 * @throws IOException When something is broken with the data. 149 */ 150 @Override 151 public void map(ImmutableBytesWritable row, final Result value, 152 Context context) 153 throws IOException { 154 if (replicatedScanner == null) { 155 Configuration conf = context.getConfiguration(); 156 sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0); 157 delimiter = conf.get(NAME + ".delimiter", ""); 158 verbose = conf.getBoolean(NAME +".verbose", false); 159 batch = conf.getInt(NAME + ".batch", -1); 160 final Scan scan = new Scan(); 161 if (batch > 0) { 162 scan.setBatch(batch); 163 } 164 scan.setCacheBlocks(false); 165 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); 166 long startTime = conf.getLong(NAME + ".startTime", 0); 167 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); 168 String families = conf.get(NAME + ".families", null); 169 if(families != null) { 170 String[] fams = families.split(","); 171 for(String fam : fams) { 172 scan.addFamily(Bytes.toBytes(fam)); 173 } 174 } 175 boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); 176 scan.setRaw(includeDeletedCells); 177 String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); 178 setRowPrefixFilter(scan, rowPrefixes); 179 scan.setTimeRange(startTime, endTime); 180 int versions = conf.getInt(NAME+".versions", -1); 181 LOG.info("Setting number of version inside map as: " + versions); 182 if (versions >= 0) { 183 scan.setMaxVersions(versions); 184 } 185 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); 186 sourceConnection = ConnectionFactory.createConnection(conf); 187 sourceTable = sourceConnection.getTable(tableName); 188 189 final InputSplit tableSplit = context.getInputSplit(); 190 191 String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); 192 Configuration peerConf = HBaseConfiguration.createClusterConf(conf, 193 zkClusterKey, PEER_CONFIG_PREFIX); 194 195 String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString()); 196 TableName peerTableName = TableName.valueOf(peerName); 197 replicatedConnection = ConnectionFactory.createConnection(peerConf); 198 replicatedTable = replicatedConnection.getTable(peerTableName); 199 scan.setStartRow(value.getRow()); 200 201 byte[] endRow = null; 202 if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) { 203 endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo() 204 .getEndKey(); 205 } else { 206 endRow = ((TableSplit) tableSplit).getEndRow(); 207 } 208 209 scan.setStopRow(endRow); 210 211 String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null); 212 if (peerSnapshotName != null) { 213 String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null); 214 String peerFSAddress = conf.get(NAME + ".peerFSAddress", null); 215 String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null); 216 FileSystem.setDefaultUri(peerConf, peerFSAddress); 217 CommonFSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress)); 218 LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:" + 219 peerSnapshotTmpDir + " peer root uri:" + CommonFSUtils.getRootDir(peerConf) + 220 " peerFSAddress:" + peerFSAddress); 221 222 replicatedScanner = new TableSnapshotScanner(peerConf, CommonFSUtils.getRootDir(peerConf), 223 new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true); 224 } else { 225 replicatedScanner = replicatedTable.getScanner(scan); 226 } 227 currentCompareRowInPeerTable = replicatedScanner.next(); 228 } 229 while (true) { 230 if (currentCompareRowInPeerTable == null) { 231 // reach the region end of peer table, row only in source table 232 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); 233 break; 234 } 235 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); 236 if (rowCmpRet == 0) { 237 // rowkey is same, need to compare the content of the row 238 try { 239 Result.compareResults(value, currentCompareRowInPeerTable); 240 context.getCounter(Counters.GOODROWS).increment(1); 241 if (verbose) { 242 LOG.info("Good row key: " + delimiter 243 + Bytes.toStringBinary(value.getRow()) + delimiter); 244 } 245 } catch (Exception e) { 246 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value); 247 } 248 currentCompareRowInPeerTable = replicatedScanner.next(); 249 break; 250 } else if (rowCmpRet < 0) { 251 // row only exists in source table 252 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value); 253 break; 254 } else { 255 // row only exists in peer table 256 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, 257 currentCompareRowInPeerTable); 258 currentCompareRowInPeerTable = replicatedScanner.next(); 259 } 260 } 261 } 262 263 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) { 264 if (sleepMsBeforeReCompare > 0) { 265 Threads.sleep(sleepMsBeforeReCompare); 266 try { 267 Result sourceResult = sourceTable.get(new Get(row.getRow())); 268 Result replicatedResult = replicatedTable.get(new Get(row.getRow())); 269 Result.compareResults(sourceResult, replicatedResult); 270 if (!sourceResult.isEmpty()) { 271 context.getCounter(Counters.GOODROWS).increment(1); 272 if (verbose) { 273 LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow()) 274 + delimiter); 275 } 276 } 277 return; 278 } catch (Exception e) { 279 LOG.error("recompare fail after sleep, rowkey=" + delimiter + 280 Bytes.toStringBinary(row.getRow()) + delimiter); 281 } 282 } 283 context.getCounter(counter).increment(1); 284 context.getCounter(Counters.BADROWS).increment(1); 285 LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toStringBinary(row.getRow()) + 286 delimiter); 287 } 288 289 @Override 290 protected void cleanup(Context context) { 291 if (replicatedScanner != null) { 292 try { 293 while (currentCompareRowInPeerTable != null) { 294 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, 295 currentCompareRowInPeerTable); 296 currentCompareRowInPeerTable = replicatedScanner.next(); 297 } 298 } catch (Exception e) { 299 LOG.error("fail to scan peer table in cleanup", e); 300 } finally { 301 replicatedScanner.close(); 302 replicatedScanner = null; 303 } 304 } 305 306 if (sourceTable != null) { 307 try { 308 sourceTable.close(); 309 } catch (IOException e) { 310 LOG.error("fail to close source table in cleanup", e); 311 } 312 } 313 if(sourceConnection != null){ 314 try { 315 sourceConnection.close(); 316 } catch (Exception e) { 317 LOG.error("fail to close source connection in cleanup", e); 318 } 319 } 320 321 if(replicatedTable != null){ 322 try{ 323 replicatedTable.close(); 324 } catch (Exception e) { 325 LOG.error("fail to close replicated table in cleanup", e); 326 } 327 } 328 if(replicatedConnection != null){ 329 try { 330 replicatedConnection.close(); 331 } catch (Exception e) { 332 LOG.error("fail to close replicated connection in cleanup", e); 333 } 334 } 335 } 336 } 337 338 private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig( 339 final Configuration conf, String peerId) throws IOException { 340 ZKWatcher localZKW = null; 341 try { 342 localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() { 343 @Override 344 public void abort(String why, Throwable e) { 345 } 346 347 @Override 348 public boolean isAborted() { 349 return false; 350 } 351 }); 352 ReplicationPeerStorage storage = 353 ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf); 354 ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId); 355 return Pair.newPair(peerConfig, 356 ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf)); 357 } catch (ReplicationException e) { 358 throw new IOException("An error occurred while trying to connect to the remote peer cluster", 359 e); 360 } finally { 361 if (localZKW != null) { 362 localZKW.close(); 363 } 364 } 365 } 366 367 private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress) 368 throws IOException { 369 Configuration peerConf = 370 HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); 371 FileSystem.setDefaultUri(peerConf, peerFSAddress); 372 CommonFSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress)); 373 FileSystem fs = FileSystem.get(peerConf); 374 RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, CommonFSUtils.getRootDir(peerConf), 375 new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName); 376 } 377 378 /** 379 * Sets up the actual job. 380 * 381 * @param conf The current configuration. 382 * @param args The command line parameters. 383 * @return The newly created job. 384 * @throws java.io.IOException When setting up the job fails. 385 */ 386 public Job createSubmittableJob(Configuration conf, String[] args) 387 throws IOException { 388 if (!doCommandLine(args)) { 389 return null; 390 } 391 conf.set(NAME+".tableName", tableName); 392 conf.setLong(NAME+".startTime", startTime); 393 conf.setLong(NAME+".endTime", endTime); 394 conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare); 395 conf.set(NAME + ".delimiter", delimiter); 396 conf.setInt(NAME + ".batch", batch); 397 conf.setBoolean(NAME +".verbose", verbose); 398 conf.setBoolean(NAME +".includeDeletedCells", includeDeletedCells); 399 if (families != null) { 400 conf.set(NAME+".families", families); 401 } 402 if (rowPrefixes != null){ 403 conf.set(NAME+".rowPrefixes", rowPrefixes); 404 } 405 406 String peerQuorumAddress; 407 Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null; 408 if (peerId != null) { 409 peerConfigPair = getPeerQuorumConfig(conf, peerId); 410 ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); 411 peerQuorumAddress = peerConfig.getClusterKey(); 412 LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + 413 peerConfig.getConfiguration()); 414 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); 415 HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, 416 peerConfig.getConfiguration().entrySet()); 417 } else { 418 assert this.peerQuorumAddress != null; 419 peerQuorumAddress = this.peerQuorumAddress; 420 LOG.info("Peer Quorum Address: " + peerQuorumAddress); 421 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); 422 } 423 424 if (peerTableName != null) { 425 LOG.info("Peer Table Name: " + peerTableName); 426 conf.set(NAME + ".peerTableName", peerTableName); 427 } 428 429 conf.setInt(NAME + ".versions", versions); 430 LOG.info("Number of version: " + versions); 431 432 //Set Snapshot specific parameters 433 if (peerSnapshotName != null) { 434 conf.set(NAME + ".peerSnapshotName", peerSnapshotName); 435 436 // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to 437 // restore snapshot. 438 Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString()); 439 peerSnapshotTmpDir = restoreDir.toString(); 440 conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir); 441 442 conf.set(NAME + ".peerFSAddress", peerFSAddress); 443 conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress); 444 445 // This is to create HDFS delegation token for peer cluster in case of secured 446 conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress, conf.get(HConstants.HBASE_DIR)); 447 } 448 449 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 450 job.setJarByClass(VerifyReplication.class); 451 452 Scan scan = new Scan(); 453 scan.setTimeRange(startTime, endTime); 454 scan.setRaw(includeDeletedCells); 455 scan.setCacheBlocks(false); 456 if (batch > 0) { 457 scan.setBatch(batch); 458 } 459 if (versions >= 0) { 460 scan.setMaxVersions(versions); 461 LOG.info("Number of versions set to " + versions); 462 } 463 if(families != null) { 464 String[] fams = families.split(","); 465 for(String fam : fams) { 466 scan.addFamily(Bytes.toBytes(fam)); 467 } 468 } 469 470 setRowPrefixFilter(scan, rowPrefixes); 471 472 if (sourceSnapshotName != null) { 473 Path snapshotTempPath = new Path(sourceSnapshotTmpDir); 474 LOG.info( 475 "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir); 476 TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null, 477 null, job, true, snapshotTempPath); 478 restoreSnapshotForPeerCluster(conf, peerQuorumAddress); 479 } else { 480 TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); 481 } 482 483 if (peerId != null) { 484 assert peerConfigPair != null; 485 Configuration peerClusterConf = peerConfigPair.getSecond(); 486 // Obtain the auth token from peer cluster 487 TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); 488 } 489 490 job.setOutputFormatClass(NullOutputFormat.class); 491 job.setNumReduceTasks(0); 492 return job; 493 } 494 495 private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { 496 if (rowPrefixes != null && !rowPrefixes.isEmpty()) { 497 String[] rowPrefixArray = rowPrefixes.split(","); 498 Arrays.sort(rowPrefixArray); 499 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); 500 for (String prefix : rowPrefixArray) { 501 Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); 502 filterList.addFilter(filter); 503 } 504 scan.setFilter(filterList); 505 byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]); 506 byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length -1]); 507 setStartAndStopRows(scan, startPrefixRow, lastPrefixRow); 508 } 509 } 510 511 private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) { 512 scan.setStartRow(startPrefixRow); 513 byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), 514 new byte[]{(byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1)}); 515 scan.setStopRow(stopRow); 516 } 517 518 public boolean doCommandLine(final String[] args) { 519 if (args.length < 2) { 520 printUsage(null); 521 return false; 522 } 523 try { 524 for (int i = 0; i < args.length; i++) { 525 String cmd = args[i]; 526 if (cmd.equals("-h") || cmd.startsWith("--h")) { 527 printUsage(null); 528 return false; 529 } 530 531 final String startTimeArgKey = "--starttime="; 532 if (cmd.startsWith(startTimeArgKey)) { 533 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 534 continue; 535 } 536 537 final String endTimeArgKey = "--endtime="; 538 if (cmd.startsWith(endTimeArgKey)) { 539 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 540 continue; 541 } 542 543 final String includeDeletedCellsArgKey = "--raw"; 544 if (cmd.equals(includeDeletedCellsArgKey)) { 545 includeDeletedCells = true; 546 continue; 547 } 548 549 final String versionsArgKey = "--versions="; 550 if (cmd.startsWith(versionsArgKey)) { 551 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 552 continue; 553 } 554 555 final String batchArgKey = "--batch="; 556 if (cmd.startsWith(batchArgKey)) { 557 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 558 continue; 559 } 560 561 final String familiesArgKey = "--families="; 562 if (cmd.startsWith(familiesArgKey)) { 563 families = cmd.substring(familiesArgKey.length()); 564 continue; 565 } 566 567 final String rowPrefixesKey = "--row-prefixes="; 568 if (cmd.startsWith(rowPrefixesKey)){ 569 rowPrefixes = cmd.substring(rowPrefixesKey.length()); 570 continue; 571 } 572 573 final String delimiterArgKey = "--delimiter="; 574 if (cmd.startsWith(delimiterArgKey)) { 575 delimiter = cmd.substring(delimiterArgKey.length()); 576 continue; 577 } 578 579 final String sleepToReCompareKey = "--recomparesleep="; 580 if (cmd.startsWith(sleepToReCompareKey)) { 581 sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); 582 continue; 583 } 584 final String verboseKey = "--verbose"; 585 if (cmd.startsWith(verboseKey)) { 586 verbose = true; 587 continue; 588 } 589 590 final String sourceSnapshotNameArgKey = "--sourceSnapshotName="; 591 if (cmd.startsWith(sourceSnapshotNameArgKey)) { 592 sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length()); 593 continue; 594 } 595 596 final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir="; 597 if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) { 598 sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length()); 599 continue; 600 } 601 602 final String peerSnapshotNameArgKey = "--peerSnapshotName="; 603 if (cmd.startsWith(peerSnapshotNameArgKey)) { 604 peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length()); 605 continue; 606 } 607 608 final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir="; 609 if (cmd.startsWith(peerSnapshotTmpDirArgKey)) { 610 peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length()); 611 continue; 612 } 613 614 final String peerFSAddressArgKey = "--peerFSAddress="; 615 if (cmd.startsWith(peerFSAddressArgKey)) { 616 peerFSAddress = cmd.substring(peerFSAddressArgKey.length()); 617 continue; 618 } 619 620 final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress="; 621 if (cmd.startsWith(peerHBaseRootAddressArgKey)) { 622 peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length()); 623 continue; 624 } 625 626 final String peerTableNameArgKey = "--peerTableName="; 627 if (cmd.startsWith(peerTableNameArgKey)) { 628 peerTableName = cmd.substring(peerTableNameArgKey.length()); 629 continue; 630 } 631 632 if (cmd.startsWith("--")) { 633 printUsage("Invalid argument '" + cmd + "'"); 634 return false; 635 } 636 637 if (i == args.length-2) { 638 if (isPeerQuorumAddress(cmd)) { 639 peerQuorumAddress = cmd; 640 } else { 641 peerId = cmd; 642 } 643 } 644 645 if (i == args.length-1) { 646 tableName = cmd; 647 } 648 } 649 650 if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null) 651 || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) { 652 printUsage("Source snapshot name and snapshot temp location should be provided" 653 + " to use snapshots in source cluster"); 654 return false; 655 } 656 657 if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null 658 || peerHBaseRootAddress != null) { 659 if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null 660 || peerHBaseRootAddress == null) { 661 printUsage( 662 "Peer snapshot name, peer snapshot temp location, Peer HBase root address and " 663 + "peer FSAddress should be provided to use snapshots in peer cluster"); 664 return false; 665 } 666 } 667 668 // This is to avoid making recompare calls to source/peer tables when snapshots are used 669 if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) { 670 printUsage( 671 "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable"); 672 return false; 673 } 674 675 } catch (Exception e) { 676 e.printStackTrace(); 677 printUsage("Can't start because " + e.getMessage()); 678 return false; 679 } 680 return true; 681 } 682 683 private boolean isPeerQuorumAddress(String cmd) { 684 try { 685 ZKConfig.validateClusterKey(cmd); 686 } catch (IOException e) { 687 // not a quorum address 688 return false; 689 } 690 return true; 691 } 692 693 /* 694 * @param errorMsg Error message. Can be null. 695 */ 696 private static void printUsage(final String errorMsg) { 697 if (errorMsg != null && errorMsg.length() > 0) { 698 System.err.println("ERROR: " + errorMsg); 699 } 700 System.err.println("Usage: verifyrep [--starttime=X]" 701 + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " 702 + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] " 703 + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] " 704 + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>"); 705 System.err.println(); 706 System.err.println("Options:"); 707 System.err.println(" starttime beginning of the time range"); 708 System.err.println(" without endtime means from starttime to forever"); 709 System.err.println(" endtime end of the time range"); 710 System.err.println(" versions number of cell versions to verify"); 711 System.err.println(" batch batch count for scan, " + 712 "note that result row counts will no longer be actual number of rows when you use this option"); 713 System.err.println(" raw includes raw scan if given in options"); 714 System.err.println(" families comma-separated list of families to copy"); 715 System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); 716 System.err.println(" delimiter the delimiter used in display around rowkey"); 717 System.err.println(" recomparesleep milliseconds to sleep before recompare row, " + 718 "default value is 0 which disables the recompare."); 719 System.err.println(" verbose logs row keys of good rows"); 720 System.err.println(" peerTableName Peer Table Name"); 721 System.err.println(" sourceSnapshotName Source Snapshot Name"); 722 System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot"); 723 System.err.println(" peerSnapshotName Peer Snapshot Name"); 724 System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot"); 725 System.err.println(" peerFSAddress Peer cluster Hadoop FS address"); 726 System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); 727 System.err.println(); 728 System.err.println("Args:"); 729 System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); 730 System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The " 731 + "format is zk_quorum:zk_port:zk_hbase_path"); 732 System.err.println(" tablename Name of the table to verify"); 733 System.err.println(); 734 System.err.println("Examples:"); 735 System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 "); 736 System.err.println(" $ hbase " + 737 "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" + 738 " --starttime=1265875194289 --endtime=1265878794289 5 TestTable "); 739 } 740 741 @Override 742 public int run(String[] args) throws Exception { 743 Configuration conf = this.getConf(); 744 Job job = createSubmittableJob(conf, args); 745 if (job != null) { 746 return job.waitForCompletion(true) ? 0 : 1; 747 } 748 return 1; 749 } 750 751 /** 752 * Main entry point. 753 * 754 * @param args The command line parameters. 755 * @throws Exception When running the job fails. 756 */ 757 public static void main(String[] args) throws Exception { 758 int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args); 759 System.exit(res); 760 } 761}