001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce.replication; 019 020import java.io.IOException; 021import java.net.URI; 022import java.util.Arrays; 023import java.util.List; 024import java.util.UUID; 025import java.util.concurrent.SynchronousQueue; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; 028import java.util.concurrent.TimeUnit; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Abortable; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.ConnectionRegistryFactory; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.ResultScanner; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.client.TableSnapshotScanner; 046import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 047import org.apache.hadoop.hbase.filter.Filter; 048import org.apache.hadoop.hbase.filter.FilterList; 049import org.apache.hadoop.hbase.filter.PrefixFilter; 050import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 051import org.apache.hadoop.hbase.mapreduce.TableInputFormat; 052import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 053import org.apache.hadoop.hbase.mapreduce.TableMapper; 054import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat; 055import org.apache.hadoop.hbase.mapreduce.TableSplit; 056import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication.Verifier.Counters; 057import org.apache.hadoop.hbase.replication.ReplicationException; 058import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 059import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 060import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 061import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.CommonFSUtils; 064import org.apache.hadoop.hbase.util.Pair; 065import org.apache.hadoop.hbase.util.Strings; 066import org.apache.hadoop.hbase.zookeeper.ZKConfig; 067import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 068import org.apache.hadoop.mapreduce.InputSplit; 069import org.apache.hadoop.mapreduce.Job; 070import org.apache.hadoop.mapreduce.MRJobConfig; 071import org.apache.hadoop.mapreduce.Mapper; 072import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 073import org.apache.hadoop.util.Tool; 074import org.apache.hadoop.util.ToolRunner; 075import org.apache.yetus.audience.InterfaceAudience; 076import org.slf4j.Logger; 077import org.slf4j.LoggerFactory; 078 079/** 080 * This map-only job compares the data from a local table with a remote one. Every cell is compared 081 * and must have exactly the same keys (even timestamp) as well as same value. It is possible to 082 * restrict the job by time range and families. The peer id that's provided must match the one given 083 * when the replication stream was setup. 084 * <p> 085 * Two counters are provided, Verifier.Counters.GOODROWS and BADROWS. The reason for a why a row is 086 * different is shown in the map's log. 087 */ 088@InterfaceAudience.Private 089public class VerifyReplication extends Configured implements Tool { 090 091 private static final Logger LOG = LoggerFactory.getLogger(VerifyReplication.class); 092 093 public final static String NAME = "verifyrep"; 094 private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; 095 private static ThreadPoolExecutor reCompareExecutor = null; 096 int reCompareTries = 0; 097 int reCompareBackoffExponent = 0; 098 int reCompareThreads = 0; 099 int sleepMsBeforeReCompare = 0; 100 long startTime = 0; 101 long endTime = Long.MAX_VALUE; 102 int batch = -1; 103 int versions = -1; 104 String tableName = null; 105 String families = null; 106 String delimiter = ""; 107 String peerId = null; 108 String peerQuorumAddress = null; 109 String rowPrefixes = null; 110 boolean verbose = false; 111 boolean includeDeletedCells = false; 112 // Source table snapshot name 113 String sourceSnapshotName = null; 114 // Temp location in source cluster to restore source snapshot 115 String sourceSnapshotTmpDir = null; 116 // Peer table snapshot name 117 String peerSnapshotName = null; 118 // Temp location in peer cluster to restore peer snapshot 119 String peerSnapshotTmpDir = null; 120 // Peer cluster Hadoop FS address 121 String peerFSAddress = null; 122 // Peer cluster HBase root dir location 123 String peerHBaseRootAddress = null; 124 // Peer Table Name 125 String peerTableName = null; 126 127 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; 128 129 /** 130 * Map-only comparator for 2 tables 131 */ 132 public static class Verifier extends TableMapper<ImmutableBytesWritable, Put> { 133 134 public enum Counters { 135 GOODROWS, 136 BADROWS, 137 ONLY_IN_SOURCE_TABLE_ROWS, 138 ONLY_IN_PEER_TABLE_ROWS, 139 CONTENT_DIFFERENT_ROWS, 140 RECOMPARES, 141 MAIN_THREAD_RECOMPARES, 142 SOURCE_ROW_CHANGED, 143 PEER_ROW_CHANGED, 144 FAILED_RECOMPARE 145 } 146 147 private Connection sourceConnection; 148 private Table sourceTable; 149 private Connection replicatedConnection; 150 private Table replicatedTable; 151 private ResultScanner replicatedScanner; 152 private Result currentCompareRowInPeerTable; 153 private Scan tableScan; 154 private int reCompareTries; 155 private int reCompareBackoffExponent; 156 private int sleepMsBeforeReCompare; 157 private String delimiter = ""; 158 private boolean verbose = false; 159 private int batch = -1; 160 161 /** 162 * Map method that compares every scanned row with the equivalent from a distant cluster. 163 * @param row The current table row key. 164 * @param value The columns. 165 * @param context The current context. 166 * @throws IOException When something is broken with the data. 167 */ 168 @Override 169 public void map(ImmutableBytesWritable row, final Result value, Context context) 170 throws IOException { 171 if (replicatedScanner == null) { 172 Configuration conf = context.getConfiguration(); 173 reCompareTries = conf.getInt(NAME + ".recompareTries", 0); 174 reCompareBackoffExponent = conf.getInt(NAME + ".recompareBackoffExponent", 1); 175 sleepMsBeforeReCompare = conf.getInt(NAME + ".sleepMsBeforeReCompare", 0); 176 if (sleepMsBeforeReCompare > 0) { 177 reCompareTries = Math.max(reCompareTries, 1); 178 } 179 delimiter = conf.get(NAME + ".delimiter", ""); 180 verbose = conf.getBoolean(NAME + ".verbose", false); 181 batch = conf.getInt(NAME + ".batch", -1); 182 final Scan scan = new Scan(); 183 if (batch > 0) { 184 scan.setBatch(batch); 185 } 186 scan.setCacheBlocks(false); 187 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1)); 188 long startTime = conf.getLong(NAME + ".startTime", 0); 189 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE); 190 String families = conf.get(NAME + ".families", null); 191 if (families != null) { 192 String[] fams = families.split(","); 193 for (String fam : fams) { 194 scan.addFamily(Bytes.toBytes(fam)); 195 } 196 } 197 boolean includeDeletedCells = conf.getBoolean(NAME + ".includeDeletedCells", false); 198 scan.setRaw(includeDeletedCells); 199 String rowPrefixes = conf.get(NAME + ".rowPrefixes", null); 200 setRowPrefixFilter(scan, rowPrefixes); 201 scan.setTimeRange(startTime, endTime); 202 int versions = conf.getInt(NAME + ".versions", -1); 203 LOG.info("Setting number of version inside map as: " + versions); 204 if (versions >= 0) { 205 scan.readVersions(versions); 206 } 207 int reCompareThreads = conf.getInt(NAME + ".recompareThreads", 0); 208 reCompareExecutor = buildReCompareExecutor(reCompareThreads, context); 209 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); 210 sourceConnection = ConnectionFactory.createConnection(conf); 211 sourceTable = sourceConnection.getTable(tableName); 212 tableScan = scan; 213 214 final InputSplit tableSplit = context.getInputSplit(); 215 216 String peerQuorumAddress = conf.get(NAME + ".peerQuorumAddress"); 217 URI connectionUri = ConnectionRegistryFactory.tryParseAsConnectionURI(peerQuorumAddress); 218 Configuration peerConf; 219 if (connectionUri != null) { 220 peerConf = HBaseConfiguration.create(conf); 221 } else { 222 peerConf = 223 HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); 224 } 225 String peerName = peerConf.get(NAME + ".peerTableName", tableName.getNameAsString()); 226 TableName peerTableName = TableName.valueOf(peerName); 227 replicatedConnection = ConnectionFactory.createConnection(connectionUri, peerConf); 228 replicatedTable = replicatedConnection.getTable(peerTableName); 229 scan.withStartRow(value.getRow()); 230 231 byte[] endRow = null; 232 if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) { 233 endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegion() 234 .getEndKey(); 235 } else { 236 endRow = ((TableSplit) tableSplit).getEndRow(); 237 } 238 239 scan.withStopRow(endRow); 240 241 String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null); 242 if (peerSnapshotName != null) { 243 String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null); 244 String peerFSAddress = conf.get(NAME + ".peerFSAddress", null); 245 String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null); 246 FileSystem.setDefaultUri(peerConf, peerFSAddress); 247 CommonFSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress)); 248 LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:" 249 + peerSnapshotTmpDir + " peer root uri:" + CommonFSUtils.getRootDir(peerConf) 250 + " peerFSAddress:" + peerFSAddress); 251 252 replicatedScanner = new TableSnapshotScanner(peerConf, CommonFSUtils.getRootDir(peerConf), 253 new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true); 254 } else { 255 replicatedScanner = replicatedTable.getScanner(scan); 256 } 257 currentCompareRowInPeerTable = replicatedScanner.next(); 258 } 259 while (true) { 260 if (currentCompareRowInPeerTable == null) { 261 // reach the region end of peer table, row only in source table 262 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null); 263 break; 264 } 265 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow()); 266 if (rowCmpRet == 0) { 267 // rowkey is same, need to compare the content of the row 268 try { 269 Result.compareResults(value, currentCompareRowInPeerTable, false); 270 context.getCounter(Counters.GOODROWS).increment(1); 271 if (verbose) { 272 LOG.info( 273 "Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter); 274 } 275 } catch (Exception e) { 276 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value, 277 currentCompareRowInPeerTable); 278 } 279 currentCompareRowInPeerTable = replicatedScanner.next(); 280 break; 281 } else if (rowCmpRet < 0) { 282 // row only exists in source table 283 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value, null); 284 break; 285 } else { 286 // row only exists in peer table 287 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null, 288 currentCompareRowInPeerTable); 289 currentCompareRowInPeerTable = replicatedScanner.next(); 290 } 291 } 292 } 293 294 @SuppressWarnings("FutureReturnValueIgnored") 295 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row, 296 Result replicatedRow) { 297 byte[] rowKey = getRow(row, replicatedRow); 298 if (reCompareTries == 0) { 299 context.getCounter(counter).increment(1); 300 context.getCounter(Counters.BADROWS).increment(1); 301 LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(rowKey), delimiter); 302 return; 303 } 304 305 VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, 306 row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable, 307 reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose); 308 309 if (reCompareExecutor == null) { 310 runnable.run(); 311 return; 312 } 313 314 reCompareExecutor.submit(runnable); 315 } 316 317 @Override 318 protected void cleanup(Context context) { 319 if (reCompareExecutor != null && !reCompareExecutor.isShutdown()) { 320 reCompareExecutor.shutdown(); 321 try { 322 boolean terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES); 323 if (!terminated) { 324 List<Runnable> queue = reCompareExecutor.shutdownNow(); 325 for (Runnable runnable : queue) { 326 ((VerifyReplicationRecompareRunnable) runnable).fail(); 327 } 328 329 terminated = reCompareExecutor.awaitTermination(1, TimeUnit.MINUTES); 330 331 if (!terminated) { 332 int activeCount = Math.max(1, reCompareExecutor.getActiveCount()); 333 LOG.warn("Found {} possible recompares still running in the executable" 334 + " incrementing BADROWS and FAILED_RECOMPARE", activeCount); 335 context.getCounter(Counters.BADROWS).increment(activeCount); 336 context.getCounter(Counters.FAILED_RECOMPARE).increment(activeCount); 337 } 338 } 339 } catch (InterruptedException e) { 340 throw new RuntimeException("Failed to await executor termination in cleanup", e); 341 } 342 } 343 if (replicatedScanner != null) { 344 try { 345 while (currentCompareRowInPeerTable != null) { 346 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS, null, 347 currentCompareRowInPeerTable); 348 currentCompareRowInPeerTable = replicatedScanner.next(); 349 } 350 } catch (Exception e) { 351 LOG.error("fail to scan peer table in cleanup", e); 352 } finally { 353 replicatedScanner.close(); 354 replicatedScanner = null; 355 } 356 } 357 358 if (sourceTable != null) { 359 try { 360 sourceTable.close(); 361 } catch (IOException e) { 362 LOG.error("fail to close source table in cleanup", e); 363 } 364 } 365 if (sourceConnection != null) { 366 try { 367 sourceConnection.close(); 368 } catch (Exception e) { 369 LOG.error("fail to close source connection in cleanup", e); 370 } 371 } 372 373 if (replicatedTable != null) { 374 try { 375 replicatedTable.close(); 376 } catch (Exception e) { 377 LOG.error("fail to close replicated table in cleanup", e); 378 } 379 } 380 if (replicatedConnection != null) { 381 try { 382 replicatedConnection.close(); 383 } catch (Exception e) { 384 LOG.error("fail to close replicated connection in cleanup", e); 385 } 386 } 387 } 388 } 389 390 private static Pair<ReplicationPeerConfig, Configuration> 391 getPeerQuorumConfig(final Configuration conf, String peerId) throws IOException { 392 ZKWatcher localZKW = null; 393 try { 394 localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() { 395 @Override 396 public void abort(String why, Throwable e) { 397 } 398 399 @Override 400 public boolean isAborted() { 401 return false; 402 } 403 }); 404 ReplicationPeerStorage storage = 405 ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), localZKW, conf); 406 ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId); 407 return Pair.newPair(peerConfig, 408 ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peerConfig)); 409 } catch (ReplicationException e) { 410 throw new IOException("An error occurred while trying to connect to the remote peer cluster", 411 e); 412 } finally { 413 if (localZKW != null) { 414 localZKW.close(); 415 } 416 } 417 } 418 419 private Configuration applyURIConf(Configuration conf, URI uri) { 420 Configuration peerConf = HBaseConfiguration.subset(conf, PEER_CONFIG_PREFIX); 421 HBaseConfiguration.merge(peerConf, conf); 422 Strings.applyURIQueriesToConf(uri, peerConf); 423 return peerConf; 424 } 425 426 private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress) 427 throws IOException { 428 URI uri = ConnectionRegistryFactory.tryParseAsConnectionURI(peerQuorumAddress); 429 Configuration peerConf; 430 if (uri != null) { 431 peerConf = applyURIConf(conf, uri); 432 } else { 433 peerConf = HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX); 434 } 435 FileSystem.setDefaultUri(peerConf, peerFSAddress); 436 CommonFSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress)); 437 FileSystem fs = FileSystem.get(peerConf); 438 RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, CommonFSUtils.getRootDir(peerConf), 439 new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName); 440 } 441 442 /** 443 * Sets up the actual job. 444 * @param conf The current configuration. 445 * @param args The command line parameters. 446 * @return The newly created job. 447 * @throws java.io.IOException When setting up the job fails. 448 */ 449 public Job createSubmittableJob(Configuration conf, String[] args) throws IOException { 450 if (!doCommandLine(args)) { 451 return null; 452 } 453 conf.set(NAME + ".tableName", tableName); 454 conf.setLong(NAME + ".startTime", startTime); 455 conf.setLong(NAME + ".endTime", endTime); 456 conf.setInt(NAME + ".sleepMsBeforeReCompare", sleepMsBeforeReCompare); 457 conf.set(NAME + ".delimiter", delimiter); 458 conf.setInt(NAME + ".batch", batch); 459 conf.setBoolean(NAME + ".verbose", verbose); 460 conf.setBoolean(NAME + ".includeDeletedCells", includeDeletedCells); 461 if (families != null) { 462 conf.set(NAME + ".families", families); 463 } 464 if (rowPrefixes != null) { 465 conf.set(NAME + ".rowPrefixes", rowPrefixes); 466 } 467 468 String peerQuorumAddress; 469 Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null; 470 if (peerId != null) { 471 peerConfigPair = getPeerQuorumConfig(conf, peerId); 472 ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); 473 peerQuorumAddress = peerConfig.getClusterKey(); 474 LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " 475 + peerConfig.getConfiguration()); 476 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); 477 HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, 478 peerConfig.getConfiguration().entrySet()); 479 } else { 480 assert this.peerQuorumAddress != null; 481 peerQuorumAddress = this.peerQuorumAddress; 482 LOG.info("Peer Quorum Address: " + peerQuorumAddress); 483 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); 484 } 485 486 if (peerTableName != null) { 487 LOG.info("Peer Table Name: " + peerTableName); 488 conf.set(NAME + ".peerTableName", peerTableName); 489 } 490 491 conf.setInt(NAME + ".versions", versions); 492 LOG.info("Number of version: " + versions); 493 494 conf.setInt(NAME + ".recompareTries", reCompareTries); 495 conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent); 496 conf.setInt(NAME + ".recompareThreads", reCompareThreads); 497 498 // Set Snapshot specific parameters 499 if (peerSnapshotName != null) { 500 conf.set(NAME + ".peerSnapshotName", peerSnapshotName); 501 502 // for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to 503 // restore snapshot. 504 Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString()); 505 peerSnapshotTmpDir = restoreDir.toString(); 506 conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir); 507 508 conf.set(NAME + ".peerFSAddress", peerFSAddress); 509 conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress); 510 511 // This is to create HDFS delegation token for peer cluster in case of secured 512 conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress, conf.get(HConstants.HBASE_DIR)); 513 } 514 515 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); 516 job.setJarByClass(VerifyReplication.class); 517 518 Scan scan = new Scan(); 519 scan.setTimeRange(startTime, endTime); 520 scan.setRaw(includeDeletedCells); 521 scan.setCacheBlocks(false); 522 if (batch > 0) { 523 scan.setBatch(batch); 524 } 525 if (versions >= 0) { 526 scan.readVersions(versions); 527 LOG.info("Number of versions set to " + versions); 528 } 529 if (families != null) { 530 String[] fams = families.split(","); 531 for (String fam : fams) { 532 scan.addFamily(Bytes.toBytes(fam)); 533 } 534 } 535 536 setRowPrefixFilter(scan, rowPrefixes); 537 538 if (sourceSnapshotName != null) { 539 Path snapshotTempPath = new Path(sourceSnapshotTmpDir); 540 LOG.info( 541 "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir); 542 TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null, 543 null, job, true, snapshotTempPath); 544 restoreSnapshotForPeerCluster(conf, peerQuorumAddress); 545 } else { 546 TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); 547 } 548 549 Configuration peerClusterBaseConf; 550 if (peerId != null) { 551 assert peerConfigPair != null; 552 peerClusterBaseConf = peerConfigPair.getSecond(); 553 } else { 554 peerClusterBaseConf = conf; 555 } 556 Configuration peerClusterConf; 557 URI uri = ConnectionRegistryFactory.tryParseAsConnectionURI(peerQuorumAddress); 558 if (uri != null) { 559 peerClusterConf = new Configuration(peerClusterBaseConf); 560 applyURIConf(peerClusterConf, uri); 561 } else { 562 peerClusterConf = HBaseConfiguration.createClusterConf(peerClusterBaseConf, peerQuorumAddress, 563 PEER_CONFIG_PREFIX); 564 } 565 // Obtain the auth token from peer cluster 566 TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf, uri); 567 568 job.setOutputFormatClass(NullOutputFormat.class); 569 job.setNumReduceTasks(0); 570 return job; 571 } 572 573 protected static byte[] getRow(Result sourceResult, Result replicatedResult) { 574 if (sourceResult != null) { 575 return sourceResult.getRow(); 576 } else if (replicatedResult != null) { 577 return replicatedResult.getRow(); 578 } 579 throw new RuntimeException("Both sourceResult and replicatedResult are null!"); 580 } 581 582 private static void setRowPrefixFilter(Scan scan, String rowPrefixes) { 583 if (rowPrefixes != null && !rowPrefixes.isEmpty()) { 584 String[] rowPrefixArray = rowPrefixes.split(","); 585 Arrays.sort(rowPrefixArray); 586 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); 587 for (String prefix : rowPrefixArray) { 588 Filter filter = new PrefixFilter(Bytes.toBytes(prefix)); 589 filterList.addFilter(filter); 590 } 591 scan.setFilter(filterList); 592 byte[] startPrefixRow = Bytes.toBytes(rowPrefixArray[0]); 593 byte[] lastPrefixRow = Bytes.toBytes(rowPrefixArray[rowPrefixArray.length - 1]); 594 setStartAndStopRows(scan, startPrefixRow, lastPrefixRow); 595 } 596 } 597 598 private static void setStartAndStopRows(Scan scan, byte[] startPrefixRow, byte[] lastPrefixRow) { 599 scan.withStartRow(startPrefixRow); 600 byte[] stopRow = Bytes.add(Bytes.head(lastPrefixRow, lastPrefixRow.length - 1), 601 new byte[] { (byte) (lastPrefixRow[lastPrefixRow.length - 1] + 1) }); 602 scan.withStopRow(stopRow); 603 } 604 605 public boolean doCommandLine(final String[] args) { 606 if (args.length < 2) { 607 printUsage(null); 608 return false; 609 } 610 try { 611 for (int i = 0; i < args.length; i++) { 612 String cmd = args[i]; 613 if (cmd.equals("-h") || cmd.startsWith("--h")) { 614 printUsage(null); 615 return false; 616 } 617 618 final String startTimeArgKey = "--starttime="; 619 if (cmd.startsWith(startTimeArgKey)) { 620 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length())); 621 continue; 622 } 623 624 final String endTimeArgKey = "--endtime="; 625 if (cmd.startsWith(endTimeArgKey)) { 626 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length())); 627 continue; 628 } 629 630 final String includeDeletedCellsArgKey = "--raw"; 631 if (cmd.equals(includeDeletedCellsArgKey)) { 632 includeDeletedCells = true; 633 continue; 634 } 635 636 final String versionsArgKey = "--versions="; 637 if (cmd.startsWith(versionsArgKey)) { 638 versions = Integer.parseInt(cmd.substring(versionsArgKey.length())); 639 continue; 640 } 641 642 final String batchArgKey = "--batch="; 643 if (cmd.startsWith(batchArgKey)) { 644 batch = Integer.parseInt(cmd.substring(batchArgKey.length())); 645 continue; 646 } 647 648 final String familiesArgKey = "--families="; 649 if (cmd.startsWith(familiesArgKey)) { 650 families = cmd.substring(familiesArgKey.length()); 651 continue; 652 } 653 654 final String rowPrefixesKey = "--row-prefixes="; 655 if (cmd.startsWith(rowPrefixesKey)) { 656 rowPrefixes = cmd.substring(rowPrefixesKey.length()); 657 continue; 658 } 659 660 final String delimiterArgKey = "--delimiter="; 661 if (cmd.startsWith(delimiterArgKey)) { 662 delimiter = cmd.substring(delimiterArgKey.length()); 663 continue; 664 } 665 666 final String deprecatedSleepToReCompareKey = "--recomparesleep="; 667 final String sleepToReCompareKey = "--recompareSleep="; 668 if (cmd.startsWith(deprecatedSleepToReCompareKey)) { 669 LOG.warn("--recomparesleep is deprecated and will be removed in 4.0.0." 670 + " Use --recompareSleep instead."); 671 sleepMsBeforeReCompare = 672 Integer.parseInt(cmd.substring(deprecatedSleepToReCompareKey.length())); 673 continue; 674 } 675 if (cmd.startsWith(sleepToReCompareKey)) { 676 sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length())); 677 continue; 678 } 679 680 final String verboseKey = "--verbose"; 681 if (cmd.startsWith(verboseKey)) { 682 verbose = true; 683 continue; 684 } 685 686 final String sourceSnapshotNameArgKey = "--sourceSnapshotName="; 687 if (cmd.startsWith(sourceSnapshotNameArgKey)) { 688 sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length()); 689 continue; 690 } 691 692 final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir="; 693 if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) { 694 sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length()); 695 continue; 696 } 697 698 final String peerSnapshotNameArgKey = "--peerSnapshotName="; 699 if (cmd.startsWith(peerSnapshotNameArgKey)) { 700 peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length()); 701 continue; 702 } 703 704 final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir="; 705 if (cmd.startsWith(peerSnapshotTmpDirArgKey)) { 706 peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length()); 707 continue; 708 } 709 710 final String peerFSAddressArgKey = "--peerFSAddress="; 711 if (cmd.startsWith(peerFSAddressArgKey)) { 712 peerFSAddress = cmd.substring(peerFSAddressArgKey.length()); 713 continue; 714 } 715 716 final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress="; 717 if (cmd.startsWith(peerHBaseRootAddressArgKey)) { 718 peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length()); 719 continue; 720 } 721 722 final String peerTableNameArgKey = "--peerTableName="; 723 if (cmd.startsWith(peerTableNameArgKey)) { 724 peerTableName = cmd.substring(peerTableNameArgKey.length()); 725 continue; 726 } 727 728 final String reCompareThreadArgs = "--recompareThreads="; 729 if (cmd.startsWith(reCompareThreadArgs)) { 730 reCompareThreads = Integer.parseInt(cmd.substring(reCompareThreadArgs.length())); 731 continue; 732 } 733 734 final String reCompareTriesKey = "--recompareTries="; 735 if (cmd.startsWith(reCompareTriesKey)) { 736 reCompareTries = Integer.parseInt(cmd.substring(reCompareTriesKey.length())); 737 continue; 738 } 739 740 final String reCompareBackoffExponentKey = "--recompareBackoffExponent="; 741 if (cmd.startsWith(reCompareBackoffExponentKey)) { 742 reCompareBackoffExponent = 743 Integer.parseInt(cmd.substring(reCompareBackoffExponentKey.length())); 744 continue; 745 } 746 747 if (cmd.startsWith("--")) { 748 printUsage("Invalid argument '" + cmd + "'"); 749 return false; 750 } 751 752 if (i == args.length - 2) { 753 if (isPeerQuorumAddress(cmd)) { 754 peerQuorumAddress = cmd; 755 } else { 756 peerId = cmd; 757 } 758 } 759 760 if (i == args.length - 1) { 761 tableName = cmd; 762 } 763 } 764 765 if ( 766 (sourceSnapshotName != null && sourceSnapshotTmpDir == null) 767 || (sourceSnapshotName == null && sourceSnapshotTmpDir != null) 768 ) { 769 printUsage("Source snapshot name and snapshot temp location should be provided" 770 + " to use snapshots in source cluster"); 771 return false; 772 } 773 774 if ( 775 peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null 776 || peerHBaseRootAddress != null 777 ) { 778 if ( 779 peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null 780 || peerHBaseRootAddress == null 781 ) { 782 printUsage( 783 "Peer snapshot name, peer snapshot temp location, Peer HBase root address and " 784 + "peer FSAddress should be provided to use snapshots in peer cluster"); 785 return false; 786 } 787 } 788 789 // This is to avoid making recompare calls to source/peer tables when snapshots are used 790 if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) { 791 printUsage( 792 "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are" 793 + " immutable"); 794 return false; 795 } 796 797 } catch (Exception e) { 798 LOG.error("Failed to parse commandLine arguments", e); 799 printUsage("Can't start because " + e.getMessage()); 800 return false; 801 } 802 return true; 803 } 804 805 private boolean isPeerQuorumAddress(String cmd) { 806 if (ConnectionRegistryFactory.tryParseAsConnectionURI(cmd) != null) { 807 return true; 808 } 809 try { 810 ZKConfig.validateClusterKey(cmd); 811 } catch (IOException e) { 812 // not a quorum address 813 return false; 814 } 815 return true; 816 } 817 818 /* 819 * @param errorMsg Error message. Can be null. 820 */ 821 private static void printUsage(final String errorMsg) { 822 if (errorMsg != null && errorMsg.length() > 0) { 823 System.err.println("ERROR: " + errorMsg); 824 } 825 System.err.println("Usage: verifyrep [--starttime=X]" 826 + " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recompareSleep=] " 827 + "[--recompareThreads=] [--recompareTries=] [--recompareBackoffExponent=]" 828 + "[--batch=] [--verbose] [--peerTableName=] [--sourceSnapshotName=P] " 829 + "[--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] [--peerSnapshotTmpDir=S] " 830 + "[--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>"); 831 System.err.println(); 832 System.err.println("Options:"); 833 System.err.println(" starttime beginning of the time range"); 834 System.err.println(" without endtime means from starttime to forever"); 835 System.err.println(" endtime end of the time range"); 836 System.err.println(" versions number of cell versions to verify"); 837 System.err.println(" batch batch count for scan, note that" 838 + " result row counts will no longer be actual number of rows when you use this option"); 839 System.err.println(" raw includes raw scan if given in options"); 840 System.err.println(" families comma-separated list of families to copy"); 841 System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); 842 System.err.println(" delimiter the delimiter used in display around rowkey"); 843 System.err.println(" recompareSleep milliseconds to sleep before recompare row, " 844 + "default value is 0 which disables the recompare."); 845 System.err.println(" recompareThreads number of threads to run recompares in"); 846 System.err.println(" recompareTries number of recompare attempts before incrementing " 847 + "the BADROWS counter. Defaults to 1 recompare"); 848 System.out.println(" recompareBackoffExponent exponential multiplier to increase " 849 + "recompareSleep after each recompare attempt, " 850 + "default value is 0 which results in a constant sleep time"); 851 System.err.println(" verbose logs row keys of good rows"); 852 System.err.println(" peerTableName Peer Table Name"); 853 System.err.println(" sourceSnapshotName Source Snapshot Name"); 854 System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot"); 855 System.err.println(" peerSnapshotName Peer Snapshot Name"); 856 System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot"); 857 System.err.println(" peerFSAddress Peer cluster Hadoop FS address"); 858 System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); 859 System.err.println(); 860 System.err.println("Args:"); 861 System.err.println(" peerid Id of the peer used for verification," 862 + " must match the one given for replication"); 863 System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The " 864 + "format is zk_quorum:zk_port:zk_hbase_path"); 865 System.err.println(" tablename Name of the table to verify"); 866 System.err.println(); 867 System.err.println("Examples:"); 868 System.err 869 .println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 "); 870 System.err 871 .println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" 872 + " --starttime=1265875194289 --endtime=1265878794289 5 TestTable "); 873 System.err.println(); 874 System.err.println( 875 " To verify the data in TestTable between the cluster runs VerifyReplication and cluster-b"); 876 System.err.println(" Assume quorum address for cluster-b is" 877 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b"); 878 System.err 879 .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" 880 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 881 + "2181:/cluster-b \\\n" + " TestTable"); 882 System.err.println(); 883 System.err 884 .println(" To verify the data in TestTable between the secured cluster runs VerifyReplication" 885 + " and insecure cluster-b"); 886 System.err 887 .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" 888 + " -D verifyrep.peer.hbase.security.authentication=simple \\\n" 889 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 890 + "2181:/cluster-b \\\n" + " TestTable"); 891 System.err.println(); 892 System.err.println(" To verify the data in TestTable between" 893 + " the secured cluster runs VerifyReplication and secured cluster-b"); 894 System.err.println(" Assume cluster-b uses different kerberos principal, cluster-b/_HOST@E" 895 + ", for master and regionserver kerberos principal from another cluster"); 896 System.err 897 .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" 898 + " -D verifyrep.peer.hbase.regionserver.kerberos.principal=" 899 + "cluster-b/_HOST@EXAMPLE.COM \\\n" 900 + " -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n" 901 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 902 + "2181:/cluster-b \\\n" + " TestTable"); 903 System.err.println(); 904 System.err.println( 905 " To verify the data in TestTable between the insecure cluster runs VerifyReplication" 906 + " and secured cluster-b"); 907 System.err 908 .println(" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" 909 + " -D verifyrep.peer.hbase.security.authentication=kerberos \\\n" 910 + " -D verifyrep.peer.hbase.regionserver.kerberos.principal=" 911 + "cluster-b/_HOST@EXAMPLE.COM \\\n" 912 + " -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n" 913 + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" 914 + "2181:/cluster-b \\\n" + " TestTable"); 915 } 916 917 private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.Context context) { 918 if (maxThreads == 0) { 919 return null; 920 } 921 922 return new ThreadPoolExecutor(0, maxThreads, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(), 923 buildRejectedReComparePolicy(context)); 924 } 925 926 private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) { 927 return new CallerRunsPolicy() { 928 @Override 929 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { 930 LOG.debug("Re-comparison execution rejected. Running in main thread."); 931 context.getCounter(Counters.MAIN_THREAD_RECOMPARES).increment(1); 932 // will run in the current thread 933 super.rejectedExecution(runnable, e); 934 } 935 }; 936 } 937 938 @Override 939 public int run(String[] args) throws Exception { 940 Configuration conf = this.getConf(); 941 Job job = createSubmittableJob(conf, args); 942 if (job != null) { 943 return job.waitForCompletion(true) ? 0 : 1; 944 } 945 return 1; 946 } 947 948 /** 949 * Main entry point. 950 * @param args The command line parameters. 951 * @throws Exception When running the job fails. 952 */ 953 public static void main(String[] args) throws Exception { 954 int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args); 955 System.exit(res); 956 } 957}