001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.test; 020 021import org.apache.hbase.thirdparty.com.google.common.base.Joiner; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.conf.Configured; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.HBaseConfiguration; 026import org.apache.hadoop.hbase.HRegionLocation; 027import org.apache.hadoop.hbase.IntegrationTestingUtility; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.ConnectionFactory; 033import org.apache.hadoop.hbase.client.Admin; 034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 035import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 036import org.apache.hadoop.util.Tool; 037import org.apache.hadoop.util.ToolRunner; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 042 043import java.util.Collections; 044import java.util.HashMap; 045import java.util.List; 046import java.util.Set; 047import java.util.TreeSet; 048import java.util.UUID; 049 050 051/** 052 * This is an integration test for replication. It is derived off 053 * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} that creates a large circular 054 * linked list in one cluster and verifies that the data is correct in a sink cluster. The test 055 * handles creating the tables and schema and setting up the replication. 056 */ 057public class IntegrationTestReplication extends IntegrationTestBigLinkedList { 058 protected String sourceClusterIdString; 059 protected String sinkClusterIdString; 060 protected int numIterations; 061 protected int numMappers; 062 protected long numNodes; 063 protected String outputDir; 064 protected int numReducers; 065 protected int generateVerifyGap; 066 protected Integer width; 067 protected Integer wrapMultiplier; 068 protected boolean noReplicationSetup = false; 069 070 private final String SOURCE_CLUSTER_OPT = "sourceCluster"; 071 private final String DEST_CLUSTER_OPT = "destCluster"; 072 private final String ITERATIONS_OPT = "iterations"; 073 private final String NUM_MAPPERS_OPT = "numMappers"; 074 private final String OUTPUT_DIR_OPT = "outputDir"; 075 private final String NUM_REDUCERS_OPT = "numReducers"; 076 private final String NO_REPLICATION_SETUP_OPT = "noReplicationSetup"; 077 078 /** 079 * The gap (in seconds) from when data is finished being generated at the source 080 * to when it can be verified. This is the replication lag we are willing to tolerate 081 */ 082 private final String GENERATE_VERIFY_GAP_OPT = "generateVerifyGap"; 083 084 /** 085 * The width of the linked list. 086 * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details 087 */ 088 private final String WIDTH_OPT = "width"; 089 090 /** 091 * The number of rows after which the linked list points to the first row. 092 * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details 093 */ 094 private final String WRAP_MULTIPLIER_OPT = "wrapMultiplier"; 095 096 /** 097 * The number of nodes in the test setup. This has to be a multiple of WRAP_MULTIPLIER * WIDTH 098 * in order to ensure that the linked list can is complete. 099 * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details 100 */ 101 private final String NUM_NODES_OPT = "numNodes"; 102 103 private final int DEFAULT_NUM_MAPPERS = 1; 104 private final int DEFAULT_NUM_REDUCERS = 1; 105 private final int DEFAULT_NUM_ITERATIONS = 1; 106 private final int DEFAULT_GENERATE_VERIFY_GAP = 60; 107 private final int DEFAULT_WIDTH = 1000000; 108 private final int DEFAULT_WRAP_MULTIPLIER = 25; 109 private final int DEFAULT_NUM_NODES = DEFAULT_WIDTH * DEFAULT_WRAP_MULTIPLIER; 110 111 /** 112 * Wrapper around an HBase ClusterID allowing us 113 * to get admin connections and configurations for it 114 */ 115 protected class ClusterID { 116 private final Configuration configuration; 117 private Connection connection = null; 118 119 /** 120 * This creates a new ClusterID wrapper that will automatically build connections and 121 * configurations to be able to talk to the specified cluster 122 * 123 * @param base the base configuration that this class will add to 124 * @param key the cluster key in the form of zk_quorum:zk_port:zk_parent_node 125 */ 126 public ClusterID(Configuration base, 127 String key) { 128 configuration = new Configuration(base); 129 String[] parts = key.split(":"); 130 configuration.set(HConstants.ZOOKEEPER_QUORUM, parts[0]); 131 configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]); 132 configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]); 133 } 134 135 @Override 136 public String toString() { 137 return Joiner.on(":").join(configuration.get(HConstants.ZOOKEEPER_QUORUM), 138 configuration.get(HConstants.ZOOKEEPER_CLIENT_PORT), 139 configuration.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); 140 } 141 142 public Configuration getConfiguration() { 143 return this.configuration; 144 } 145 146 public Connection getConnection() throws Exception { 147 if (this.connection == null) { 148 this.connection = ConnectionFactory.createConnection(this.configuration); 149 } 150 return this.connection; 151 } 152 153 public void closeConnection() throws Exception { 154 this.connection.close(); 155 this.connection = null; 156 } 157 158 public boolean equals(ClusterID other) { 159 return this.toString().equalsIgnoreCase(other.toString()); 160 } 161 } 162 163 /** 164 * The main runner loop for the test. It uses 165 * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} 166 * for the generation and verification of the linked list. It is heavily based on 167 * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Loop} 168 */ 169 protected class VerifyReplicationLoop extends Configured implements Tool { 170 private final Logger LOG = LoggerFactory.getLogger(VerifyReplicationLoop.class); 171 protected ClusterID source; 172 protected ClusterID sink; 173 174 IntegrationTestBigLinkedList integrationTestBigLinkedList; 175 176 /** 177 * This tears down any tables that existed from before and rebuilds the tables and schemas on 178 * the source cluster. It then sets up replication from the source to the sink cluster by using 179 * the {@link org.apache.hadoop.hbase.client.replication.ReplicationAdmin} 180 * connection. 181 * 182 * @throws Exception 183 */ 184 protected void setupTablesAndReplication() throws Exception { 185 TableName tableName = getTableName(source.getConfiguration()); 186 187 ClusterID[] clusters = {source, sink}; 188 189 // delete any old tables in the source and sink 190 for (ClusterID cluster : clusters) { 191 Admin admin = cluster.getConnection().getAdmin(); 192 193 if (admin.tableExists(tableName)) { 194 if (admin.isTableEnabled(tableName)) { 195 admin.disableTable(tableName); 196 } 197 198 /** 199 * TODO: This is a work around on a replication bug (HBASE-13416) 200 * When we recreate a table against that has recently been 201 * deleted, the contents of the logs are replayed even though 202 * they should not. This ensures that we flush the logs 203 * before the table gets deleted. Eventually the bug should be 204 * fixed and this should be removed. 205 */ 206 Set<ServerName> regionServers = new TreeSet<>(); 207 for (HRegionLocation rl : 208 cluster.getConnection().getRegionLocator(tableName).getAllRegionLocations()) { 209 regionServers.add(rl.getServerName()); 210 } 211 212 for (ServerName server : regionServers) { 213 source.getConnection().getAdmin().rollWALWriter(server); 214 } 215 216 admin.deleteTable(tableName); 217 } 218 } 219 220 // create the schema 221 Generator generator = new Generator(); 222 generator.setConf(source.getConfiguration()); 223 generator.createSchema(); 224 225 // setup the replication on the source 226 if (!source.equals(sink)) { 227 try (final Admin admin = source.getConnection().getAdmin()) { 228 // remove any old replication peers 229 for (ReplicationPeerDescription peer : admin.listReplicationPeers()) { 230 admin.removeReplicationPeer(peer.getPeerId()); 231 } 232 233 // set the test table to be the table to replicate 234 HashMap<TableName, List<String>> toReplicate = new HashMap<>(); 235 toReplicate.put(tableName, Collections.emptyList()); 236 237 // set the sink to be the target 238 final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 239 .setClusterKey(sink.toString()) 240 .setReplicateAllUserTables(false) 241 .setTableCFsMap(toReplicate).build(); 242 243 admin.addReplicationPeer("TestPeer", peerConfig); 244 admin.enableTableReplication(tableName); 245 } 246 } 247 248 for (ClusterID cluster : clusters) { 249 cluster.closeConnection(); 250 } 251 } 252 253 protected void waitForReplication() throws Exception { 254 // TODO: we shouldn't be sleeping here. It would be better to query the region servers 255 // and wait for them to report 0 replication lag. 256 Thread.sleep(generateVerifyGap * 1000); 257 } 258 259 /** 260 * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Generator} in the 261 * source cluster. This assumes that the tables have been setup via setupTablesAndReplication. 262 * 263 * @throws Exception 264 */ 265 protected void runGenerator() throws Exception { 266 Path outputPath = new Path(outputDir); 267 UUID uuid = UUID.randomUUID(); //create a random UUID. 268 Path generatorOutput = new Path(outputPath, uuid.toString()); 269 270 Generator generator = new Generator(); 271 generator.setConf(source.getConfiguration()); 272 273 // Disable concurrent walkers for IntegrationTestReplication 274 int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, 0); 275 if (retCode > 0) { 276 throw new RuntimeException("Generator failed with return code: " + retCode); 277 } 278 } 279 280 281 /** 282 * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Verify} 283 * in the sink cluster. If replication is working properly the data written at the source 284 * cluster should be available in the sink cluster after a reasonable gap 285 * 286 * @param expectedNumNodes the number of nodes we are expecting to see in the sink cluster 287 * @throws Exception 288 */ 289 protected void runVerify(long expectedNumNodes) throws Exception { 290 Path outputPath = new Path(outputDir); 291 UUID uuid = UUID.randomUUID(); //create a random UUID. 292 Path iterationOutput = new Path(outputPath, uuid.toString()); 293 294 Verify verify = new Verify(); 295 verify.setConf(sink.getConfiguration()); 296 297 int retCode = verify.run(iterationOutput, numReducers); 298 if (retCode > 0) { 299 throw new RuntimeException("Verify.run failed with return code: " + retCode); 300 } 301 302 if (!verify.verify(expectedNumNodes)) { 303 throw new RuntimeException("Verify.verify failed"); 304 } 305 306 LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes); 307 } 308 309 /** 310 * The main test runner 311 * 312 * This test has 4 steps: 313 * 1: setupTablesAndReplication 314 * 2: generate the data into the source cluster 315 * 3: wait for replication to propagate 316 * 4: verify that the data is available in the sink cluster 317 * 318 * @param args should be empty 319 * @return 0 on success 320 * @throws Exception on an error 321 */ 322 @Override 323 public int run(String[] args) throws Exception { 324 source = new ClusterID(getConf(), sourceClusterIdString); 325 sink = new ClusterID(getConf(), sinkClusterIdString); 326 327 if (!noReplicationSetup) { 328 setupTablesAndReplication(); 329 } 330 int expectedNumNodes = 0; 331 for (int i = 0; i < numIterations; i++) { 332 LOG.info("Starting iteration = " + i); 333 334 expectedNumNodes += numMappers * numNodes; 335 336 runGenerator(); 337 waitForReplication(); 338 runVerify(expectedNumNodes); 339 } 340 341 /** 342 * we are always returning 0 because exceptions are thrown when there is an error 343 * in the verification step. 344 */ 345 return 0; 346 } 347 } 348 349 @Override 350 protected void addOptions() { 351 super.addOptions(); 352 addRequiredOptWithArg("s", SOURCE_CLUSTER_OPT, 353 "Cluster ID of the source cluster (e.g. localhost:2181:/hbase)"); 354 addRequiredOptWithArg("r", DEST_CLUSTER_OPT, 355 "Cluster ID of the sink cluster (e.g. localhost:2182:/hbase)"); 356 addRequiredOptWithArg("d", OUTPUT_DIR_OPT, 357 "Temporary directory where to write keys for the test"); 358 359 addOptWithArg("nm", NUM_MAPPERS_OPT, 360 "Number of mappers (default: " + DEFAULT_NUM_MAPPERS + ")"); 361 addOptWithArg("nr", NUM_REDUCERS_OPT, 362 "Number of reducers (default: " + DEFAULT_NUM_MAPPERS + ")"); 363 addOptNoArg("nrs", NO_REPLICATION_SETUP_OPT, 364 "Don't setup tables or configure replication before starting test"); 365 addOptWithArg("n", NUM_NODES_OPT, 366 "Number of nodes. This should be a multiple of width * wrapMultiplier." + 367 " (default: " + DEFAULT_NUM_NODES + ")"); 368 addOptWithArg("i", ITERATIONS_OPT, "Number of iterations to run (default: " + 369 DEFAULT_NUM_ITERATIONS + ")"); 370 addOptWithArg("t", GENERATE_VERIFY_GAP_OPT, 371 "Gap between generate and verify steps in seconds (default: " + 372 DEFAULT_GENERATE_VERIFY_GAP + ")"); 373 addOptWithArg("w", WIDTH_OPT, 374 "Width of the linked list chain (default: " + DEFAULT_WIDTH + ")"); 375 addOptWithArg("wm", WRAP_MULTIPLIER_OPT, "How many times to wrap around (default: " + 376 DEFAULT_WRAP_MULTIPLIER + ")"); 377 } 378 379 @Override 380 protected void processOptions(CommandLine cmd) { 381 processBaseOptions(cmd); 382 383 sourceClusterIdString = cmd.getOptionValue(SOURCE_CLUSTER_OPT); 384 sinkClusterIdString = cmd.getOptionValue(DEST_CLUSTER_OPT); 385 outputDir = cmd.getOptionValue(OUTPUT_DIR_OPT); 386 387 /** This uses parseInt from {@link org.apache.hadoop.hbase.util.AbstractHBaseTool} */ 388 numMappers = parseInt(cmd.getOptionValue(NUM_MAPPERS_OPT, 389 Integer.toString(DEFAULT_NUM_MAPPERS)), 390 1, Integer.MAX_VALUE); 391 numReducers = parseInt(cmd.getOptionValue(NUM_REDUCERS_OPT, 392 Integer.toString(DEFAULT_NUM_REDUCERS)), 393 1, Integer.MAX_VALUE); 394 numNodes = parseInt(cmd.getOptionValue(NUM_NODES_OPT, Integer.toString(DEFAULT_NUM_NODES)), 395 1, Integer.MAX_VALUE); 396 generateVerifyGap = parseInt(cmd.getOptionValue(GENERATE_VERIFY_GAP_OPT, 397 Integer.toString(DEFAULT_GENERATE_VERIFY_GAP)), 398 1, Integer.MAX_VALUE); 399 numIterations = parseInt(cmd.getOptionValue(ITERATIONS_OPT, 400 Integer.toString(DEFAULT_NUM_ITERATIONS)), 401 1, Integer.MAX_VALUE); 402 width = parseInt(cmd.getOptionValue(WIDTH_OPT, Integer.toString(DEFAULT_WIDTH)), 403 1, Integer.MAX_VALUE); 404 wrapMultiplier = parseInt(cmd.getOptionValue(WRAP_MULTIPLIER_OPT, 405 Integer.toString(DEFAULT_WRAP_MULTIPLIER)), 406 1, Integer.MAX_VALUE); 407 408 if (cmd.hasOption(NO_REPLICATION_SETUP_OPT)) { 409 noReplicationSetup = true; 410 } 411 412 if (numNodes % (width * wrapMultiplier) != 0) { 413 throw new RuntimeException("numNodes must be a multiple of width and wrap multiplier"); 414 } 415 } 416 417 @Override 418 public int runTestFromCommandLine() throws Exception { 419 VerifyReplicationLoop tool = new VerifyReplicationLoop(); 420 tool.integrationTestBigLinkedList = this; 421 return ToolRunner.run(getConf(), tool, null); 422 } 423 424 public static void main(String[] args) throws Exception { 425 Configuration conf = HBaseConfiguration.create(); 426 IntegrationTestingUtility.setUseDistributedCluster(conf); 427 int ret = ToolRunner.run(conf, new IntegrationTestReplication(), args); 428 System.exit(ret); 429 } 430}