001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.test;
020
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.conf.Configured;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.HBaseConfiguration;
025import org.apache.hadoop.hbase.HRegionLocation;
026import org.apache.hadoop.hbase.IntegrationTestingUtility;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
034import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
035import org.apache.hadoop.util.Tool;
036import org.apache.hadoop.util.ToolRunner;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
041import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
042
043import java.util.Collections;
044import java.util.HashMap;
045import java.util.List;
046import java.util.Set;
047import java.util.TreeSet;
048import java.util.UUID;
049
050
051/**
052 * This is an integration test for replication. It is derived off
053 * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} that creates a large circular
054 * linked list in one cluster and verifies that the data is correct in a sink cluster. The test
055 * handles creating the tables and schema and setting up the replication.
056 */
057public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
058  protected String sourceClusterIdString;
059  protected String sinkClusterIdString;
060  protected int numIterations;
061  protected int numMappers;
062  protected long numNodes;
063  protected String outputDir;
064  protected int numReducers;
065  protected int generateVerifyGap;
066  protected Integer width;
067  protected Integer wrapMultiplier;
068  protected boolean noReplicationSetup = false;
069
070  private final String SOURCE_CLUSTER_OPT = "sourceCluster";
071  private final String DEST_CLUSTER_OPT = "destCluster";
072  private final String ITERATIONS_OPT = "iterations";
073  private final String NUM_MAPPERS_OPT = "numMappers";
074  private final String OUTPUT_DIR_OPT = "outputDir";
075  private final String NUM_REDUCERS_OPT = "numReducers";
076  private final String NO_REPLICATION_SETUP_OPT = "noReplicationSetup";
077
078  /**
079   * The gap (in seconds) from when data is finished being generated at the source
080   * to when it can be verified. This is the replication lag we are willing to tolerate
081   */
082  private final String GENERATE_VERIFY_GAP_OPT = "generateVerifyGap";
083
084  /**
085   * The width of the linked list.
086   * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details
087   */
088  private final String WIDTH_OPT = "width";
089
090  /**
091   * The number of rows after which the linked list points to the first row.
092   * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details
093   */
094  private final String WRAP_MULTIPLIER_OPT = "wrapMultiplier";
095
096  /**
097   * The number of nodes in the test setup. This has to be a multiple of WRAP_MULTIPLIER * WIDTH
098   * in order to ensure that the linked list can is complete.
099   * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details
100   */
101  private final String NUM_NODES_OPT = "numNodes";
102
103  private final int DEFAULT_NUM_MAPPERS = 1;
104  private final int DEFAULT_NUM_REDUCERS = 1;
105  private final int DEFAULT_NUM_ITERATIONS = 1;
106  private final int DEFAULT_GENERATE_VERIFY_GAP = 60;
107  private final int DEFAULT_WIDTH = 1000000;
108  private final int DEFAULT_WRAP_MULTIPLIER = 25;
109  private final int DEFAULT_NUM_NODES = DEFAULT_WIDTH * DEFAULT_WRAP_MULTIPLIER;
110
111  /**
112   * Wrapper around an HBase ClusterID allowing us
113   * to get admin connections and configurations for it
114   */
115  protected class ClusterID {
116    private final Configuration configuration;
117    private Connection connection = null;
118
119    /**
120     * This creates a new ClusterID wrapper that will automatically build connections and
121     * configurations to be able to talk to the specified cluster
122     *
123     * @param base the base configuration that this class will add to
124     * @param key the cluster key in the form of zk_quorum:zk_port:zk_parent_node
125     */
126    public ClusterID(Configuration base,
127                     String key) {
128      configuration = new Configuration(base);
129      String[] parts = key.split(":");
130      configuration.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
131      configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]);
132      configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]);
133    }
134
135    @Override
136    public String toString() {
137      return Joiner.on(":").join(configuration.get(HConstants.ZOOKEEPER_QUORUM),
138                                 configuration.get(HConstants.ZOOKEEPER_CLIENT_PORT),
139                                 configuration.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
140    }
141
142    public Configuration getConfiguration() {
143      return this.configuration;
144    }
145
146    public Connection getConnection() throws Exception {
147      if (this.connection == null) {
148        this.connection = ConnectionFactory.createConnection(this.configuration);
149      }
150      return this.connection;
151    }
152
153    public void closeConnection() throws Exception {
154      this.connection.close();
155      this.connection = null;
156    }
157
158    public boolean equals(ClusterID other) {
159      return this.toString().equalsIgnoreCase(other.toString());
160    }
161  }
162
163  /**
164   * The main runner loop for the test. It uses
165   * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList}
166   * for the generation and verification of the linked list. It is heavily based on
167   * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Loop}
168   */
169  protected class VerifyReplicationLoop extends Configured implements Tool {
170    private final Logger LOG = LoggerFactory.getLogger(VerifyReplicationLoop.class);
171    protected ClusterID source;
172    protected ClusterID sink;
173
174    IntegrationTestBigLinkedList integrationTestBigLinkedList;
175
176    /**
177     * This tears down any tables that existed from before and rebuilds the tables and schemas on
178     * the source cluster. It then sets up replication from the source to the sink cluster by using
179     * the {@link org.apache.hadoop.hbase.client.Admin} connection.
180     *
181     * @throws Exception
182     */
183    protected void setupTablesAndReplication() throws Exception {
184      TableName tableName = getTableName(source.getConfiguration());
185
186      ClusterID[] clusters = {source, sink};
187
188      // delete any old tables in the source and sink
189      for (ClusterID cluster : clusters) {
190        Admin admin = cluster.getConnection().getAdmin();
191
192        if (admin.tableExists(tableName)) {
193          if (admin.isTableEnabled(tableName)) {
194            admin.disableTable(tableName);
195          }
196
197          /**
198           * TODO: This is a work around on a replication bug (HBASE-13416)
199           * When we recreate a table against that has recently been
200           * deleted, the contents of the logs are replayed even though
201           * they should not. This ensures that we flush the logs
202           * before the table gets deleted. Eventually the bug should be
203           * fixed and this should be removed.
204           */
205          Set<ServerName> regionServers = new TreeSet<>();
206          for (HRegionLocation rl :
207               cluster.getConnection().getRegionLocator(tableName).getAllRegionLocations()) {
208            regionServers.add(rl.getServerName());
209          }
210
211          for (ServerName server : regionServers) {
212            source.getConnection().getAdmin().rollWALWriter(server);
213          }
214
215          admin.deleteTable(tableName);
216        }
217      }
218
219      // create the schema
220      Generator generator = new Generator();
221      generator.setConf(source.getConfiguration());
222      generator.createSchema();
223
224      // setup the replication on the source
225      if (!source.equals(sink)) {
226        try (final Admin admin = source.getConnection().getAdmin()) {
227          // remove any old replication peers
228          for (ReplicationPeerDescription peer : admin.listReplicationPeers()) {
229            admin.removeReplicationPeer(peer.getPeerId());
230          }
231
232          // set the test table to be the table to replicate
233          HashMap<TableName, List<String>> toReplicate = new HashMap<>();
234          toReplicate.put(tableName, Collections.emptyList());
235
236          // set the sink to be the target
237          final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
238              .setClusterKey(sink.toString())
239              .setReplicateAllUserTables(false)
240              .setTableCFsMap(toReplicate).build();
241
242          admin.addReplicationPeer("TestPeer", peerConfig);
243          admin.enableTableReplication(tableName);
244        }
245      }
246
247      for (ClusterID cluster : clusters) {
248        cluster.closeConnection();
249      }
250    }
251
252    protected void waitForReplication() throws Exception {
253      // TODO: we shouldn't be sleeping here. It would be better to query the region servers
254      // and wait for them to report 0 replication lag.
255      Thread.sleep(generateVerifyGap * 1000);
256    }
257
258    /**
259     * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Generator} in the
260     * source cluster. This assumes that the tables have been setup via setupTablesAndReplication.
261     *
262     * @throws Exception
263     */
264    protected void runGenerator() throws Exception {
265      Path outputPath = new Path(outputDir);
266      UUID uuid = util.getRandomUUID(); //create a random UUID.
267      Path generatorOutput = new Path(outputPath, uuid.toString());
268
269      Generator generator = new Generator();
270      generator.setConf(source.getConfiguration());
271
272      // Disable concurrent walkers for IntegrationTestReplication
273      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, 0);
274      if (retCode > 0) {
275        throw new RuntimeException("Generator failed with return code: " + retCode);
276      }
277    }
278
279
280    /**
281     * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Verify}
282     * in the sink cluster. If replication is working properly the data written at the source
283     * cluster should be available in the sink cluster after a reasonable gap
284     *
285     * @param expectedNumNodes the number of nodes we are expecting to see in the sink cluster
286     * @throws Exception
287     */
288    protected void runVerify(long expectedNumNodes) throws Exception {
289      Path outputPath = new Path(outputDir);
290      UUID uuid = util.getRandomUUID(); //create a random UUID.
291      Path iterationOutput = new Path(outputPath, uuid.toString());
292
293      Verify verify = new Verify();
294      verify.setConf(sink.getConfiguration());
295
296      int retCode = verify.run(iterationOutput, numReducers);
297      if (retCode > 0) {
298        throw new RuntimeException("Verify.run failed with return code: " + retCode);
299      }
300
301      if (!verify.verify(expectedNumNodes)) {
302        throw new RuntimeException("Verify.verify failed");
303      }
304
305      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
306    }
307
308    /**
309     * The main test runner
310     *
311     * This test has 4 steps:
312     *  1: setupTablesAndReplication
313     *  2: generate the data into the source cluster
314     *  3: wait for replication to propagate
315     *  4: verify that the data is available in the sink cluster
316     *
317     * @param args should be empty
318     * @return 0 on success
319     * @throws Exception on an error
320     */
321    @Override
322    public int run(String[] args) throws Exception {
323      source = new ClusterID(getConf(), sourceClusterIdString);
324      sink = new ClusterID(getConf(), sinkClusterIdString);
325
326      if (!noReplicationSetup) {
327        setupTablesAndReplication();
328      }
329      int expectedNumNodes = 0;
330      for (int i = 0; i < numIterations; i++) {
331        LOG.info("Starting iteration = " + i);
332
333        expectedNumNodes += numMappers * numNodes;
334
335        runGenerator();
336        waitForReplication();
337        runVerify(expectedNumNodes);
338      }
339
340      /**
341       * we are always returning 0 because exceptions are thrown when there is an error
342       * in the verification step.
343       */
344      return 0;
345    }
346  }
347
348  @Override
349  protected void addOptions() {
350    super.addOptions();
351    addRequiredOptWithArg("s", SOURCE_CLUSTER_OPT,
352                          "Cluster ID of the source cluster (e.g. localhost:2181:/hbase)");
353    addRequiredOptWithArg("r", DEST_CLUSTER_OPT,
354                          "Cluster ID of the sink cluster (e.g. localhost:2182:/hbase)");
355    addRequiredOptWithArg("d", OUTPUT_DIR_OPT,
356                          "Temporary directory where to write keys for the test");
357
358    addOptWithArg("nm", NUM_MAPPERS_OPT,
359                  "Number of mappers (default: " + DEFAULT_NUM_MAPPERS + ")");
360    addOptWithArg("nr", NUM_REDUCERS_OPT,
361                  "Number of reducers (default: " + DEFAULT_NUM_MAPPERS + ")");
362    addOptNoArg("nrs", NO_REPLICATION_SETUP_OPT,
363                  "Don't setup tables or configure replication before starting test");
364    addOptWithArg("n", NUM_NODES_OPT,
365                  "Number of nodes. This should be a multiple of width * wrapMultiplier."  +
366                  " (default: " + DEFAULT_NUM_NODES + ")");
367    addOptWithArg("i", ITERATIONS_OPT, "Number of iterations to run (default: " +
368                  DEFAULT_NUM_ITERATIONS +  ")");
369    addOptWithArg("t", GENERATE_VERIFY_GAP_OPT,
370                  "Gap between generate and verify steps in seconds (default: " +
371                  DEFAULT_GENERATE_VERIFY_GAP + ")");
372    addOptWithArg("w", WIDTH_OPT,
373                  "Width of the linked list chain (default: " + DEFAULT_WIDTH + ")");
374    addOptWithArg("wm", WRAP_MULTIPLIER_OPT, "How many times to wrap around (default: " +
375                  DEFAULT_WRAP_MULTIPLIER + ")");
376  }
377
378  @Override
379  protected void processOptions(CommandLine cmd) {
380    processBaseOptions(cmd);
381
382    sourceClusterIdString = cmd.getOptionValue(SOURCE_CLUSTER_OPT);
383    sinkClusterIdString = cmd.getOptionValue(DEST_CLUSTER_OPT);
384    outputDir = cmd.getOptionValue(OUTPUT_DIR_OPT);
385
386    /** This uses parseInt from {@link org.apache.hadoop.hbase.util.AbstractHBaseTool} */
387    numMappers = parseInt(cmd.getOptionValue(NUM_MAPPERS_OPT,
388                                             Integer.toString(DEFAULT_NUM_MAPPERS)),
389                          1, Integer.MAX_VALUE);
390    numReducers = parseInt(cmd.getOptionValue(NUM_REDUCERS_OPT,
391                                              Integer.toString(DEFAULT_NUM_REDUCERS)),
392                           1, Integer.MAX_VALUE);
393    numNodes = parseInt(cmd.getOptionValue(NUM_NODES_OPT, Integer.toString(DEFAULT_NUM_NODES)),
394                        1, Integer.MAX_VALUE);
395    generateVerifyGap = parseInt(cmd.getOptionValue(GENERATE_VERIFY_GAP_OPT,
396                                                    Integer.toString(DEFAULT_GENERATE_VERIFY_GAP)),
397                                 1, Integer.MAX_VALUE);
398    numIterations = parseInt(cmd.getOptionValue(ITERATIONS_OPT,
399                                                Integer.toString(DEFAULT_NUM_ITERATIONS)),
400                             1, Integer.MAX_VALUE);
401    width = parseInt(cmd.getOptionValue(WIDTH_OPT, Integer.toString(DEFAULT_WIDTH)),
402                                        1, Integer.MAX_VALUE);
403    wrapMultiplier = parseInt(cmd.getOptionValue(WRAP_MULTIPLIER_OPT,
404                                                 Integer.toString(DEFAULT_WRAP_MULTIPLIER)),
405                              1, Integer.MAX_VALUE);
406
407    if (cmd.hasOption(NO_REPLICATION_SETUP_OPT)) {
408      noReplicationSetup = true;
409    }
410
411    if (numNodes % (width * wrapMultiplier) != 0) {
412      throw new RuntimeException("numNodes must be a multiple of width and wrap multiplier");
413    }
414  }
415
416  @Override
417  public int runTestFromCommandLine() throws Exception {
418    VerifyReplicationLoop tool = new  VerifyReplicationLoop();
419    tool.integrationTestBigLinkedList = this;
420    return ToolRunner.run(getConf(), tool, null);
421  }
422
423  public static void main(String[] args) throws Exception {
424    Configuration conf = HBaseConfiguration.create();
425    IntegrationTestingUtility.setUseDistributedCluster(conf);
426    int ret = ToolRunner.run(conf, new IntegrationTestReplication(), args);
427    System.exit(ret);
428  }
429}