001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.testclassification.IntegrationTests;
027import org.apache.hadoop.hbase.util.ConstantDelayQueue;
028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
029import org.apache.hadoop.hbase.util.LoadTestTool;
030import org.apache.hadoop.hbase.util.MultiThreadedUpdater;
031import org.apache.hadoop.hbase.util.MultiThreadedWriter;
032import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
033import org.apache.hadoop.hbase.util.TableDescriptorChecker;
034import org.apache.hadoop.hbase.util.Threads;
035import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
036import org.apache.hadoop.util.StringUtils;
037import org.apache.hadoop.util.ToolRunner;
038import org.junit.Assert;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
043
044/**
045 * Integration test for testing async wal replication to secondary region replicas. Sets up a table
046 * with given region replication (default 2), and uses LoadTestTool client writer, updater and
047 * reader threads for writes and reads and verification. It uses a delay queue with a given delay
048 * ("read_delay_ms", default 5000ms) between the writer/updater and reader threads to make the
049 * written items available to readers. This means that a reader will only start reading from a row
050 * written by the writer / updater after 5secs has passed. The reader thread performs the reads from
051 * the given region replica id (default 1) to perform the reads. Async wal replication has to finish
052 * with the replication of the edits before read_delay_ms to the given region replica id so that
053 * the read and verify will not fail.
054 *
055 * The job will run for <b>at least</b> given runtime (default 10min) by running a concurrent
056 * writer and reader workload followed by a concurrent updater and reader workload for
057 * num_keys_per_server.
058 * <p>
059 * Example usage:
060 * </p>
061 * <pre>
062 * hbase org.apache.hadoop.hbase.IntegrationTestRegionReplicaReplication
063 * -DIntegrationTestRegionReplicaReplication.num_keys_per_server=10000
064 * -Dhbase.IntegrationTestRegionReplicaReplication.runtime=600000
065 * -DIntegrationTestRegionReplicaReplication.read_delay_ms=5000
066 * -DIntegrationTestRegionReplicaReplication.region_replication=3
067 * -DIntegrationTestRegionReplicaReplication.region_replica_id=2
068 * -DIntegrationTestRegionReplicaReplication.num_read_threads=100
069 * -DIntegrationTestRegionReplicaReplication.num_write_threads=100
070 * </pre>
071 */
072@Category(IntegrationTests.class)
073public class IntegrationTestRegionReplicaReplication extends IntegrationTestIngest {
074
075  private static final String TEST_NAME
076    = IntegrationTestRegionReplicaReplication.class.getSimpleName();
077
078  private static final String OPT_READ_DELAY_MS = "read_delay_ms";
079
080  private static final int DEFAULT_REGION_REPLICATION = 2;
081  private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
082  private static final String[] DEFAULT_COLUMN_FAMILIES = new String[] {"f1", "f2", "f3"};
083
084  @Override
085  protected int getMinServerCount() {
086    return SERVER_COUNT;
087  }
088
089  @Override
090  public void setConf(Configuration conf) {
091    conf.setIfUnset(
092      String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICATION),
093      String.valueOf(DEFAULT_REGION_REPLICATION));
094
095    conf.setIfUnset(
096      String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_COLUMN_FAMILIES),
097      StringUtils.join(",", DEFAULT_COLUMN_FAMILIES));
098
099    conf.setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, true);
100
101    // enable async wal replication to region replicas for unit tests
102    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
103
104    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024L * 1024 * 4); // flush every 4 MB
105    conf.setInt("hbase.hstore.blockingStoreFiles", 100);
106
107    super.setConf(conf);
108  }
109
110  @Override
111  @Test
112  public void testIngest() throws Exception {
113    runIngestTest(JUNIT_RUN_TIME, 25000, 10, 1024, 10, 20);
114  }
115
116  /**
117   * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer
118   * threads to become available to the MultiThradedReader threads. We add this delay because of
119   * the async nature of the wal replication to region replicas.
120   */
121  public static class DelayingMultiThreadedWriter extends MultiThreadedWriter {
122    private long delayMs;
123    public DelayingMultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
124        TableName tableName) throws IOException {
125      super(dataGen, conf, tableName);
126    }
127    @Override
128    protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
129      this.delayMs = conf.getLong(String.format("%s.%s",
130        IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
131      return new ConstantDelayQueue<>(TimeUnit.MILLISECONDS, delayMs);
132    }
133  }
134
135  /**
136   * This extends MultiThreadedWriter to add a configurable delay to the keys written by the writer
137   * threads to become available to the MultiThradedReader threads. We add this delay because of
138   * the async nature of the wal replication to region replicas.
139   */
140  public static class DelayingMultiThreadedUpdater extends MultiThreadedUpdater {
141    private long delayMs;
142    public DelayingMultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
143        TableName tableName, double updatePercent) throws IOException {
144      super(dataGen, conf, tableName, updatePercent);
145    }
146    @Override
147    protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
148      this.delayMs = conf.getLong(String.format("%s.%s",
149        IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
150      return new ConstantDelayQueue<>(TimeUnit.MILLISECONDS, delayMs);
151    }
152  }
153
154  @Override
155  protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
156      int recordSize, int writeThreads, int readThreads) throws Exception {
157
158    LOG.info("Running ingest");
159    LOG.info("Cluster size:" + util.getHBaseClusterInterface()
160      .getClusterMetrics().getLiveServerMetrics().size());
161
162    // sleep for some time so that the cache for disabled tables does not interfere.
163    Threads.sleep(
164      getConf().getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs",
165        5000) + 1000);
166
167    long start = EnvironmentEdgeManager.currentTime();
168    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
169    long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
170    long startKey = 0;
171
172    long numKeys = getNumKeys(keysPerServerPerIter);
173    while (EnvironmentEdgeManager.currentTime() - start < 0.9 * runtime) {
174      LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
175          ((runtime - (EnvironmentEdgeManager.currentTime() - start))/60000) + " min");
176
177      int verifyPercent = 100;
178      int updatePercent = 20;
179      int ret = -1;
180      int regionReplicaId = conf.getInt(String.format("%s.%s"
181        , TEST_NAME, LoadTestTool.OPT_REGION_REPLICA_ID), 1);
182
183      // we will run writers and readers at the same time.
184      List<String> args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
185      args.add("-write");
186      args.add(String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads));
187      args.add("-" + LoadTestTool.OPT_MULTIPUT);
188      args.add("-writer");
189      args.add(DelayingMultiThreadedWriter.class.getName()); // inject writer class
190      args.add("-read");
191      args.add(String.format("%d:%d", verifyPercent, readThreads));
192      args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
193      args.add(String.valueOf(regionReplicaId));
194
195      ret = loadTool.run(args.toArray(new String[args.size()]));
196      if (0 != ret) {
197        String errorMsg = "Load failed with error code " + ret;
198        LOG.error(errorMsg);
199        Assert.fail(errorMsg);
200      }
201
202      args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
203      args.add("-update");
204      args.add(String.format("%s:%s:1", updatePercent, writeThreads));
205      args.add("-updater");
206      args.add(DelayingMultiThreadedUpdater.class.getName()); // inject updater class
207      args.add("-read");
208      args.add(String.format("%d:%d", verifyPercent, readThreads));
209      args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
210      args.add(String.valueOf(regionReplicaId));
211
212      ret = loadTool.run(args.toArray(new String[args.size()]));
213      if (0 != ret) {
214        String errorMsg = "Load failed with error code " + ret;
215        LOG.error(errorMsg);
216        Assert.fail(errorMsg);
217      }
218      startKey += numKeys;
219    }
220  }
221
222  public static void main(String[] args) throws Exception {
223    Configuration conf = HBaseConfiguration.create();
224    IntegrationTestingUtility.setUseDistributedCluster(conf);
225    int ret = ToolRunner.run(conf, new IntegrationTestRegionReplicaReplication(), args);
226    System.exit(ret);
227  }
228}