001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.test;
019
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Set;
025import java.util.TreeSet;
026import java.util.UUID;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.conf.Configured;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.HRegionLocation;
033import org.apache.hadoop.hbase.IntegrationTestingUtility;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.ConnectionFactory;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
041import org.apache.hadoop.util.Tool;
042import org.apache.hadoop.util.ToolRunner;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
047import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
048import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
049
050/**
051 * This is an integration test for replication. It is derived off
052 * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} that creates a large circular
053 * linked list in one cluster and verifies that the data is correct in a sink cluster. The test
054 * handles creating the tables and schema and setting up the replication.
055 */
056public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
057  protected String sourceClusterIdString;
058  protected String sinkClusterIdString;
059  protected int numIterations;
060  protected int numMappers;
061  protected long numNodes;
062  protected String outputDir;
063  protected int numReducers;
064  protected int generateVerifyGap;
065  protected Integer width;
066  protected Integer wrapMultiplier;
067  protected boolean noReplicationSetup = false;
068
069  private final String SOURCE_CLUSTER_OPT = "sourceCluster";
070  private final String DEST_CLUSTER_OPT = "destCluster";
071  private final String ITERATIONS_OPT = "iterations";
072  private final String NUM_MAPPERS_OPT = "numMappers";
073  private final String OUTPUT_DIR_OPT = "outputDir";
074  private final String NUM_REDUCERS_OPT = "numReducers";
075  private final String NO_REPLICATION_SETUP_OPT = "noReplicationSetup";
076
077  /**
078   * The gap (in seconds) from when data is finished being generated at the source to when it can be
079   * verified. This is the replication lag we are willing to tolerate
080   */
081  private final String GENERATE_VERIFY_GAP_OPT = "generateVerifyGap";
082
083  /**
084   * The width of the linked list. See
085   * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details
086   */
087  private final String WIDTH_OPT = "width";
088
089  /**
090   * The number of rows after which the linked list points to the first row. See
091   * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details
092   */
093  private final String WRAP_MULTIPLIER_OPT = "wrapMultiplier";
094
095  /**
096   * The number of nodes in the test setup. This has to be a multiple of WRAP_MULTIPLIER * WIDTH in
097   * order to ensure that the linked list can is complete. See
098   * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details
099   */
100  private final String NUM_NODES_OPT = "numNodes";
101
102  private final int DEFAULT_NUM_MAPPERS = 1;
103  private final int DEFAULT_NUM_REDUCERS = 1;
104  private final int DEFAULT_NUM_ITERATIONS = 1;
105  private final int DEFAULT_GENERATE_VERIFY_GAP = 60;
106  private final int DEFAULT_WIDTH = 1000000;
107  private final int DEFAULT_WRAP_MULTIPLIER = 25;
108  private final int DEFAULT_NUM_NODES = DEFAULT_WIDTH * DEFAULT_WRAP_MULTIPLIER;
109
110  /**
111   * Wrapper around an HBase ClusterID allowing us to get admin connections and configurations for
112   * it
113   */
114  protected static class ClusterID {
115    private final Configuration configuration;
116    private Connection connection = null;
117
118    /**
119     * This creates a new ClusterID wrapper that will automatically build connections and
120     * configurations to be able to talk to the specified cluster
121     * @param base the base configuration that this class will add to
122     * @param key  the cluster key in the form of zk_quorum:zk_port:zk_parent_node
123     */
124    public ClusterID(Configuration base, String key) {
125      configuration = new Configuration(base);
126      Iterator<String> iter = Splitter.on(':').split(key).iterator();
127      configuration.set(HConstants.ZOOKEEPER_QUORUM, iter.next());
128      configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, iter.next());
129      configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, iter.next());
130    }
131
132    @Override
133    public String toString() {
134      return Joiner.on(":").join(configuration.get(HConstants.ZOOKEEPER_QUORUM),
135        configuration.get(HConstants.ZOOKEEPER_CLIENT_PORT),
136        configuration.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
137    }
138
139    public Configuration getConfiguration() {
140      return this.configuration;
141    }
142
143    public Connection getConnection() throws Exception {
144      if (this.connection == null) {
145        this.connection = ConnectionFactory.createConnection(this.configuration);
146      }
147      return this.connection;
148    }
149
150    public void closeConnection() throws Exception {
151      this.connection.close();
152      this.connection = null;
153    }
154
155    @Override
156    public boolean equals(Object other) {
157      if (this == other) {
158        return true;
159      }
160      if (!(other instanceof ClusterID)) {
161        return false;
162      }
163      return toString().equalsIgnoreCase(other.toString());
164    }
165
166    @Override
167    public int hashCode() {
168      return toString().hashCode();
169    }
170  }
171
172  /**
173   * The main runner loop for the test. It uses
174   * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for the generation and
175   * verification of the linked list. It is heavily based on
176   * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Loop}
177   */
178  protected class VerifyReplicationLoop extends Configured implements Tool {
179    private final Logger LOG = LoggerFactory.getLogger(VerifyReplicationLoop.class);
180    protected ClusterID source;
181    protected ClusterID sink;
182
183    IntegrationTestBigLinkedList integrationTestBigLinkedList;
184
185    /**
186     * This tears down any tables that existed from before and rebuilds the tables and schemas on
187     * the source cluster. It then sets up replication from the source to the sink cluster by using
188     * the {@link org.apache.hadoop.hbase.client.replication.ReplicationAdmin} connection. n
189     */
190    protected void setupTablesAndReplication() throws Exception {
191      TableName tableName = getTableName(source.getConfiguration());
192
193      ClusterID[] clusters = { source, sink };
194
195      // delete any old tables in the source and sink
196      for (ClusterID cluster : clusters) {
197        Admin admin = cluster.getConnection().getAdmin();
198
199        if (admin.tableExists(tableName)) {
200          if (admin.isTableEnabled(tableName)) {
201            admin.disableTable(tableName);
202          }
203
204          /**
205           * TODO: This is a work around on a replication bug (HBASE-13416) When we recreate a table
206           * against that has recently been deleted, the contents of the logs are replayed even
207           * though they should not. This ensures that we flush the logs before the table gets
208           * deleted. Eventually the bug should be fixed and this should be removed.
209           */
210          Set<ServerName> regionServers = new TreeSet<>();
211          for (HRegionLocation rl : cluster.getConnection().getRegionLocator(tableName)
212            .getAllRegionLocations()) {
213            regionServers.add(rl.getServerName());
214          }
215
216          for (ServerName server : regionServers) {
217            source.getConnection().getAdmin().rollWALWriter(server);
218          }
219
220          admin.deleteTable(tableName);
221        }
222      }
223
224      // create the schema
225      Generator generator = new Generator();
226      generator.setConf(source.getConfiguration());
227      generator.createSchema();
228
229      // setup the replication on the source
230      if (!source.equals(sink)) {
231        try (final Admin admin = source.getConnection().getAdmin()) {
232          // remove any old replication peers
233          for (ReplicationPeerDescription peer : admin.listReplicationPeers()) {
234            admin.removeReplicationPeer(peer.getPeerId());
235          }
236
237          // set the test table to be the table to replicate
238          HashMap<TableName, List<String>> toReplicate = new HashMap<>();
239          toReplicate.put(tableName, Collections.emptyList());
240
241          // set the sink to be the target
242          final ReplicationPeerConfig peerConfig =
243            ReplicationPeerConfig.newBuilder().setClusterKey(sink.toString())
244              .setReplicateAllUserTables(false).setTableCFsMap(toReplicate).build();
245
246          admin.addReplicationPeer("TestPeer", peerConfig);
247          admin.enableTableReplication(tableName);
248        }
249      }
250
251      for (ClusterID cluster : clusters) {
252        cluster.closeConnection();
253      }
254    }
255
256    protected void waitForReplication() throws Exception {
257      // TODO: we shouldn't be sleeping here. It would be better to query the region servers
258      // and wait for them to report 0 replication lag.
259      Thread.sleep(generateVerifyGap * 1000);
260    }
261
262    /**
263     * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Generator} in the
264     * source cluster. This assumes that the tables have been setup via setupTablesAndReplication. n
265     */
266    protected void runGenerator() throws Exception {
267      Path outputPath = new Path(outputDir);
268      UUID uuid = util.getRandomUUID(); // create a random UUID.
269      Path generatorOutput = new Path(outputPath, uuid.toString());
270
271      Generator generator = new Generator();
272      generator.setConf(source.getConfiguration());
273
274      // Disable concurrent walkers for IntegrationTestReplication
275      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, 0);
276      if (retCode > 0) {
277        throw new RuntimeException("Generator failed with return code: " + retCode);
278      }
279    }
280
281    /**
282     * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Verify} in the sink
283     * cluster. If replication is working properly the data written at the source cluster should be
284     * available in the sink cluster after a reasonable gap
285     * @param expectedNumNodes the number of nodes we are expecting to see in the sink cluster n
286     */
287    protected void runVerify(long expectedNumNodes) throws Exception {
288      Path outputPath = new Path(outputDir);
289      UUID uuid = util.getRandomUUID(); // create a random UUID.
290      Path iterationOutput = new Path(outputPath, uuid.toString());
291
292      Verify verify = new Verify();
293      verify.setConf(sink.getConfiguration());
294
295      int retCode = verify.run(iterationOutput, numReducers);
296      if (retCode > 0) {
297        throw new RuntimeException("Verify.run failed with return code: " + retCode);
298      }
299
300      if (!verify.verify(expectedNumNodes)) {
301        throw new RuntimeException("Verify.verify failed");
302      }
303
304      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
305    }
306
307    /**
308     * The main test runner This test has 4 steps: 1: setupTablesAndReplication 2: generate the data
309     * into the source cluster 3: wait for replication to propagate 4: verify that the data is
310     * available in the sink cluster
311     * @param args should be empty
312     * @return 0 on success
313     * @throws Exception on an error
314     */
315    @Override
316    public int run(String[] args) throws Exception {
317      source = new ClusterID(getConf(), sourceClusterIdString);
318      sink = new ClusterID(getConf(), sinkClusterIdString);
319
320      if (!noReplicationSetup) {
321        setupTablesAndReplication();
322      }
323      long expectedNumNodes = 0;
324      for (int i = 0; i < numIterations; i++) {
325        LOG.info("Starting iteration = " + i);
326
327        expectedNumNodes += numMappers * numNodes;
328
329        runGenerator();
330        waitForReplication();
331        runVerify(expectedNumNodes);
332      }
333
334      /**
335       * we are always returning 0 because exceptions are thrown when there is an error in the
336       * verification step.
337       */
338      return 0;
339    }
340  }
341
342  @Override
343  protected void addOptions() {
344    super.addOptions();
345    addRequiredOptWithArg("s", SOURCE_CLUSTER_OPT,
346      "Cluster ID of the source cluster (e.g. localhost:2181:/hbase)");
347    addRequiredOptWithArg("r", DEST_CLUSTER_OPT,
348      "Cluster ID of the sink cluster (e.g. localhost:2182:/hbase)");
349    addRequiredOptWithArg("d", OUTPUT_DIR_OPT,
350      "Temporary directory where to write keys for the test");
351
352    addOptWithArg("nm", NUM_MAPPERS_OPT,
353      "Number of mappers (default: " + DEFAULT_NUM_MAPPERS + ")");
354    addOptWithArg("nr", NUM_REDUCERS_OPT,
355      "Number of reducers (default: " + DEFAULT_NUM_MAPPERS + ")");
356    addOptNoArg("nrs", NO_REPLICATION_SETUP_OPT,
357      "Don't setup tables or configure replication before starting test");
358    addOptWithArg("n", NUM_NODES_OPT,
359      "Number of nodes. This should be a multiple of width * wrapMultiplier." + " (default: "
360        + DEFAULT_NUM_NODES + ")");
361    addOptWithArg("i", ITERATIONS_OPT,
362      "Number of iterations to run (default: " + DEFAULT_NUM_ITERATIONS + ")");
363    addOptWithArg("t", GENERATE_VERIFY_GAP_OPT,
364      "Gap between generate and verify steps in seconds (default: " + DEFAULT_GENERATE_VERIFY_GAP
365        + ")");
366    addOptWithArg("w", WIDTH_OPT,
367      "Width of the linked list chain (default: " + DEFAULT_WIDTH + ")");
368    addOptWithArg("wm", WRAP_MULTIPLIER_OPT,
369      "How many times to wrap around (default: " + DEFAULT_WRAP_MULTIPLIER + ")");
370  }
371
372  @Override
373  protected void processOptions(CommandLine cmd) {
374    processBaseOptions(cmd);
375
376    sourceClusterIdString = cmd.getOptionValue(SOURCE_CLUSTER_OPT);
377    sinkClusterIdString = cmd.getOptionValue(DEST_CLUSTER_OPT);
378    outputDir = cmd.getOptionValue(OUTPUT_DIR_OPT);
379
380    /** This uses parseInt from {@link org.apache.hadoop.hbase.util.AbstractHBaseTool} */
381    numMappers =
382      parseInt(cmd.getOptionValue(NUM_MAPPERS_OPT, Integer.toString(DEFAULT_NUM_MAPPERS)), 1,
383        Integer.MAX_VALUE);
384    numReducers =
385      parseInt(cmd.getOptionValue(NUM_REDUCERS_OPT, Integer.toString(DEFAULT_NUM_REDUCERS)), 1,
386        Integer.MAX_VALUE);
387    numNodes = parseInt(cmd.getOptionValue(NUM_NODES_OPT, Integer.toString(DEFAULT_NUM_NODES)), 1,
388      Integer.MAX_VALUE);
389    generateVerifyGap = parseInt(
390      cmd.getOptionValue(GENERATE_VERIFY_GAP_OPT, Integer.toString(DEFAULT_GENERATE_VERIFY_GAP)), 1,
391      Integer.MAX_VALUE);
392    numIterations =
393      parseInt(cmd.getOptionValue(ITERATIONS_OPT, Integer.toString(DEFAULT_NUM_ITERATIONS)), 1,
394        Integer.MAX_VALUE);
395    width = parseInt(cmd.getOptionValue(WIDTH_OPT, Integer.toString(DEFAULT_WIDTH)), 1,
396      Integer.MAX_VALUE);
397    wrapMultiplier =
398      parseInt(cmd.getOptionValue(WRAP_MULTIPLIER_OPT, Integer.toString(DEFAULT_WRAP_MULTIPLIER)),
399        1, Integer.MAX_VALUE);
400
401    if (cmd.hasOption(NO_REPLICATION_SETUP_OPT)) {
402      noReplicationSetup = true;
403    }
404
405    if (numNodes % (width * wrapMultiplier) != 0) {
406      throw new RuntimeException("numNodes must be a multiple of width and wrap multiplier");
407    }
408  }
409
410  @Override
411  public int runTestFromCommandLine() throws Exception {
412    VerifyReplicationLoop tool = new VerifyReplicationLoop();
413    tool.integrationTestBigLinkedList = this;
414    return ToolRunner.run(getConf(), tool, null);
415  }
416
417  public static void main(String[] args) throws Exception {
418    Configuration conf = HBaseConfiguration.create();
419    IntegrationTestingUtility.setUseDistributedCluster(conf);
420    int ret = ToolRunner.run(conf, new IntegrationTestReplication(), args);
421    System.exit(ret);
422  }
423}