001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.test;
020
021import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.conf.Configured;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.HBaseConfiguration;
026import org.apache.hadoop.hbase.HRegionLocation;
027import org.apache.hadoop.hbase.IntegrationTestingUtility;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.client.Connection;
032import org.apache.hadoop.hbase.client.ConnectionFactory;
033import org.apache.hadoop.hbase.client.Admin;
034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
035import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
036import org.apache.hadoop.util.Tool;
037import org.apache.hadoop.util.ToolRunner;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
042
043import java.util.Collections;
044import java.util.HashMap;
045import java.util.List;
046import java.util.Set;
047import java.util.TreeSet;
048import java.util.UUID;
049
050
051/**
052 * This is an integration test for replication. It is derived off
053 * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} that creates a large circular
054 * linked list in one cluster and verifies that the data is correct in a sink cluster. The test
055 * handles creating the tables and schema and setting up the replication.
056 */
057public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
058  protected String sourceClusterIdString;
059  protected String sinkClusterIdString;
060  protected int numIterations;
061  protected int numMappers;
062  protected long numNodes;
063  protected String outputDir;
064  protected int numReducers;
065  protected int generateVerifyGap;
066  protected Integer width;
067  protected Integer wrapMultiplier;
068  protected boolean noReplicationSetup = false;
069
070  private final String SOURCE_CLUSTER_OPT = "sourceCluster";
071  private final String DEST_CLUSTER_OPT = "destCluster";
072  private final String ITERATIONS_OPT = "iterations";
073  private final String NUM_MAPPERS_OPT = "numMappers";
074  private final String OUTPUT_DIR_OPT = "outputDir";
075  private final String NUM_REDUCERS_OPT = "numReducers";
076  private final String NO_REPLICATION_SETUP_OPT = "noReplicationSetup";
077
078  /**
079   * The gap (in seconds) from when data is finished being generated at the source
080   * to when it can be verified. This is the replication lag we are willing to tolerate
081   */
082  private final String GENERATE_VERIFY_GAP_OPT = "generateVerifyGap";
083
084  /**
085   * The width of the linked list.
086   * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details
087   */
088  private final String WIDTH_OPT = "width";
089
090  /**
091   * The number of rows after which the linked list points to the first row.
092   * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details
093   */
094  private final String WRAP_MULTIPLIER_OPT = "wrapMultiplier";
095
096  /**
097   * The number of nodes in the test setup. This has to be a multiple of WRAP_MULTIPLIER * WIDTH
098   * in order to ensure that the linked list can is complete.
099   * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details
100   */
101  private final String NUM_NODES_OPT = "numNodes";
102
103  private final int DEFAULT_NUM_MAPPERS = 1;
104  private final int DEFAULT_NUM_REDUCERS = 1;
105  private final int DEFAULT_NUM_ITERATIONS = 1;
106  private final int DEFAULT_GENERATE_VERIFY_GAP = 60;
107  private final int DEFAULT_WIDTH = 1000000;
108  private final int DEFAULT_WRAP_MULTIPLIER = 25;
109  private final int DEFAULT_NUM_NODES = DEFAULT_WIDTH * DEFAULT_WRAP_MULTIPLIER;
110
111  /**
112   * Wrapper around an HBase ClusterID allowing us
113   * to get admin connections and configurations for it
114   */
115  protected class ClusterID {
116    private final Configuration configuration;
117    private Connection connection = null;
118
119    /**
120     * This creates a new ClusterID wrapper that will automatically build connections and
121     * configurations to be able to talk to the specified cluster
122     *
123     * @param base the base configuration that this class will add to
124     * @param key the cluster key in the form of zk_quorum:zk_port:zk_parent_node
125     */
126    public ClusterID(Configuration base,
127                     String key) {
128      configuration = new Configuration(base);
129      String[] parts = key.split(":");
130      configuration.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
131      configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]);
132      configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]);
133    }
134
135    @Override
136    public String toString() {
137      return Joiner.on(":").join(configuration.get(HConstants.ZOOKEEPER_QUORUM),
138                                 configuration.get(HConstants.ZOOKEEPER_CLIENT_PORT),
139                                 configuration.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
140    }
141
142    public Configuration getConfiguration() {
143      return this.configuration;
144    }
145
146    public Connection getConnection() throws Exception {
147      if (this.connection == null) {
148        this.connection = ConnectionFactory.createConnection(this.configuration);
149      }
150      return this.connection;
151    }
152
153    public void closeConnection() throws Exception {
154      this.connection.close();
155      this.connection = null;
156    }
157
158    public boolean equals(ClusterID other) {
159      return this.toString().equalsIgnoreCase(other.toString());
160    }
161  }
162
163  /**
164   * The main runner loop for the test. It uses
165   * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList}
166   * for the generation and verification of the linked list. It is heavily based on
167   * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Loop}
168   */
169  protected class VerifyReplicationLoop extends Configured implements Tool {
170    private final Logger LOG = LoggerFactory.getLogger(VerifyReplicationLoop.class);
171    protected ClusterID source;
172    protected ClusterID sink;
173
174    IntegrationTestBigLinkedList integrationTestBigLinkedList;
175
176    /**
177     * This tears down any tables that existed from before and rebuilds the tables and schemas on
178     * the source cluster. It then sets up replication from the source to the sink cluster by using
179     * the {@link org.apache.hadoop.hbase.client.replication.ReplicationAdmin}
180     * connection.
181     *
182     * @throws Exception
183     */
184    protected void setupTablesAndReplication() throws Exception {
185      TableName tableName = getTableName(source.getConfiguration());
186
187      ClusterID[] clusters = {source, sink};
188
189      // delete any old tables in the source and sink
190      for (ClusterID cluster : clusters) {
191        Admin admin = cluster.getConnection().getAdmin();
192
193        if (admin.tableExists(tableName)) {
194          if (admin.isTableEnabled(tableName)) {
195            admin.disableTable(tableName);
196          }
197
198          /**
199           * TODO: This is a work around on a replication bug (HBASE-13416)
200           * When we recreate a table against that has recently been
201           * deleted, the contents of the logs are replayed even though
202           * they should not. This ensures that we flush the logs
203           * before the table gets deleted. Eventually the bug should be
204           * fixed and this should be removed.
205           */
206          Set<ServerName> regionServers = new TreeSet<>();
207          for (HRegionLocation rl :
208               cluster.getConnection().getRegionLocator(tableName).getAllRegionLocations()) {
209            regionServers.add(rl.getServerName());
210          }
211
212          for (ServerName server : regionServers) {
213            source.getConnection().getAdmin().rollWALWriter(server);
214          }
215
216          admin.deleteTable(tableName);
217        }
218      }
219
220      // create the schema
221      Generator generator = new Generator();
222      generator.setConf(source.getConfiguration());
223      generator.createSchema();
224
225      // setup the replication on the source
226      if (!source.equals(sink)) {
227        try (final Admin admin = source.getConnection().getAdmin()) {
228          // remove any old replication peers
229          for (ReplicationPeerDescription peer : admin.listReplicationPeers()) {
230            admin.removeReplicationPeer(peer.getPeerId());
231          }
232
233          // set the test table to be the table to replicate
234          HashMap<TableName, List<String>> toReplicate = new HashMap<>();
235          toReplicate.put(tableName, Collections.emptyList());
236
237          // set the sink to be the target
238          final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
239              .setClusterKey(sink.toString())
240              .setReplicateAllUserTables(false)
241              .setTableCFsMap(toReplicate).build();
242
243          admin.addReplicationPeer("TestPeer", peerConfig);
244          admin.enableTableReplication(tableName);
245        }
246      }
247
248      for (ClusterID cluster : clusters) {
249        cluster.closeConnection();
250      }
251    }
252
253    protected void waitForReplication() throws Exception {
254      // TODO: we shouldn't be sleeping here. It would be better to query the region servers
255      // and wait for them to report 0 replication lag.
256      Thread.sleep(generateVerifyGap * 1000);
257    }
258
259    /**
260     * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Generator} in the
261     * source cluster. This assumes that the tables have been setup via setupTablesAndReplication.
262     *
263     * @throws Exception
264     */
265    protected void runGenerator() throws Exception {
266      Path outputPath = new Path(outputDir);
267      UUID uuid = UUID.randomUUID(); //create a random UUID.
268      Path generatorOutput = new Path(outputPath, uuid.toString());
269
270      Generator generator = new Generator();
271      generator.setConf(source.getConfiguration());
272
273      // Disable concurrent walkers for IntegrationTestReplication
274      int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier, 0);
275      if (retCode > 0) {
276        throw new RuntimeException("Generator failed with return code: " + retCode);
277      }
278    }
279
280
281    /**
282     * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Verify}
283     * in the sink cluster. If replication is working properly the data written at the source
284     * cluster should be available in the sink cluster after a reasonable gap
285     *
286     * @param expectedNumNodes the number of nodes we are expecting to see in the sink cluster
287     * @throws Exception
288     */
289    protected void runVerify(long expectedNumNodes) throws Exception {
290      Path outputPath = new Path(outputDir);
291      UUID uuid = UUID.randomUUID(); //create a random UUID.
292      Path iterationOutput = new Path(outputPath, uuid.toString());
293
294      Verify verify = new Verify();
295      verify.setConf(sink.getConfiguration());
296
297      int retCode = verify.run(iterationOutput, numReducers);
298      if (retCode > 0) {
299        throw new RuntimeException("Verify.run failed with return code: " + retCode);
300      }
301
302      if (!verify.verify(expectedNumNodes)) {
303        throw new RuntimeException("Verify.verify failed");
304      }
305
306      LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
307    }
308
309    /**
310     * The main test runner
311     *
312     * This test has 4 steps:
313     *  1: setupTablesAndReplication
314     *  2: generate the data into the source cluster
315     *  3: wait for replication to propagate
316     *  4: verify that the data is available in the sink cluster
317     *
318     * @param args should be empty
319     * @return 0 on success
320     * @throws Exception on an error
321     */
322    @Override
323    public int run(String[] args) throws Exception {
324      source = new ClusterID(getConf(), sourceClusterIdString);
325      sink = new ClusterID(getConf(), sinkClusterIdString);
326
327      if (!noReplicationSetup) {
328        setupTablesAndReplication();
329      }
330      int expectedNumNodes = 0;
331      for (int i = 0; i < numIterations; i++) {
332        LOG.info("Starting iteration = " + i);
333
334        expectedNumNodes += numMappers * numNodes;
335
336        runGenerator();
337        waitForReplication();
338        runVerify(expectedNumNodes);
339      }
340
341      /**
342       * we are always returning 0 because exceptions are thrown when there is an error
343       * in the verification step.
344       */
345      return 0;
346    }
347  }
348
349  @Override
350  protected void addOptions() {
351    super.addOptions();
352    addRequiredOptWithArg("s", SOURCE_CLUSTER_OPT,
353                          "Cluster ID of the source cluster (e.g. localhost:2181:/hbase)");
354    addRequiredOptWithArg("r", DEST_CLUSTER_OPT,
355                          "Cluster ID of the sink cluster (e.g. localhost:2182:/hbase)");
356    addRequiredOptWithArg("d", OUTPUT_DIR_OPT,
357                          "Temporary directory where to write keys for the test");
358
359    addOptWithArg("nm", NUM_MAPPERS_OPT,
360                  "Number of mappers (default: " + DEFAULT_NUM_MAPPERS + ")");
361    addOptWithArg("nr", NUM_REDUCERS_OPT,
362                  "Number of reducers (default: " + DEFAULT_NUM_MAPPERS + ")");
363    addOptNoArg("nrs", NO_REPLICATION_SETUP_OPT,
364                  "Don't setup tables or configure replication before starting test");
365    addOptWithArg("n", NUM_NODES_OPT,
366                  "Number of nodes. This should be a multiple of width * wrapMultiplier."  +
367                  " (default: " + DEFAULT_NUM_NODES + ")");
368    addOptWithArg("i", ITERATIONS_OPT, "Number of iterations to run (default: " +
369                  DEFAULT_NUM_ITERATIONS +  ")");
370    addOptWithArg("t", GENERATE_VERIFY_GAP_OPT,
371                  "Gap between generate and verify steps in seconds (default: " +
372                  DEFAULT_GENERATE_VERIFY_GAP + ")");
373    addOptWithArg("w", WIDTH_OPT,
374                  "Width of the linked list chain (default: " + DEFAULT_WIDTH + ")");
375    addOptWithArg("wm", WRAP_MULTIPLIER_OPT, "How many times to wrap around (default: " +
376                  DEFAULT_WRAP_MULTIPLIER + ")");
377  }
378
379  @Override
380  protected void processOptions(CommandLine cmd) {
381    processBaseOptions(cmd);
382
383    sourceClusterIdString = cmd.getOptionValue(SOURCE_CLUSTER_OPT);
384    sinkClusterIdString = cmd.getOptionValue(DEST_CLUSTER_OPT);
385    outputDir = cmd.getOptionValue(OUTPUT_DIR_OPT);
386
387    /** This uses parseInt from {@link org.apache.hadoop.hbase.util.AbstractHBaseTool} */
388    numMappers = parseInt(cmd.getOptionValue(NUM_MAPPERS_OPT,
389                                             Integer.toString(DEFAULT_NUM_MAPPERS)),
390                          1, Integer.MAX_VALUE);
391    numReducers = parseInt(cmd.getOptionValue(NUM_REDUCERS_OPT,
392                                              Integer.toString(DEFAULT_NUM_REDUCERS)),
393                           1, Integer.MAX_VALUE);
394    numNodes = parseInt(cmd.getOptionValue(NUM_NODES_OPT, Integer.toString(DEFAULT_NUM_NODES)),
395                        1, Integer.MAX_VALUE);
396    generateVerifyGap = parseInt(cmd.getOptionValue(GENERATE_VERIFY_GAP_OPT,
397                                                    Integer.toString(DEFAULT_GENERATE_VERIFY_GAP)),
398                                 1, Integer.MAX_VALUE);
399    numIterations = parseInt(cmd.getOptionValue(ITERATIONS_OPT,
400                                                Integer.toString(DEFAULT_NUM_ITERATIONS)),
401                             1, Integer.MAX_VALUE);
402    width = parseInt(cmd.getOptionValue(WIDTH_OPT, Integer.toString(DEFAULT_WIDTH)),
403                                        1, Integer.MAX_VALUE);
404    wrapMultiplier = parseInt(cmd.getOptionValue(WRAP_MULTIPLIER_OPT,
405                                                 Integer.toString(DEFAULT_WRAP_MULTIPLIER)),
406                              1, Integer.MAX_VALUE);
407
408    if (cmd.hasOption(NO_REPLICATION_SETUP_OPT)) {
409      noReplicationSetup = true;
410    }
411
412    if (numNodes % (width * wrapMultiplier) != 0) {
413      throw new RuntimeException("numNodes must be a multiple of width and wrap multiplier");
414    }
415  }
416
417  @Override
418  public int runTestFromCommandLine() throws Exception {
419    VerifyReplicationLoop tool = new  VerifyReplicationLoop();
420    tool.integrationTestBigLinkedList = this;
421    return ToolRunner.run(getConf(), tool, null);
422  }
423
424  public static void main(String[] args) throws Exception {
425    Configuration conf = HBaseConfiguration.create();
426    IntegrationTestingUtility.setUseDistributedCluster(conf);
427    int ret = ToolRunner.run(conf, new IntegrationTestReplication(), args);
428    System.exit(ret);
429  }
430}