001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Set;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.testclassification.IntegrationTests;
027import org.apache.hadoop.hbase.util.Bytes;
028import org.apache.hadoop.hbase.util.HFileTestUtil;
029import org.apache.hadoop.hbase.util.LoadTestTool;
030import org.apache.hadoop.hbase.util.Threads;
031import org.apache.hadoop.util.StringUtils;
032import org.apache.hadoop.util.ToolRunner;
033import org.junit.Assert;
034import org.junit.Test;
035import org.junit.experimental.categories.Category;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
039
040/**
041 * A base class for tests that do something with the cluster while running
042 * {@link LoadTestTool} to write and verify some data.
043 */
044@Category(IntegrationTests.class)
045public class IntegrationTestIngest extends IntegrationTestBase {
046  public static final char HIPHEN = '-';
047  private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
048  protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
049  protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
050
051  /** A soft limit on how long we should run */
052  protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
053
054  protected static final String NUM_KEYS_PER_SERVER_KEY = "num_keys_per_server";
055  protected static final long DEFAULT_NUM_KEYS_PER_SERVER = 2500;
056
057  protected static final String NUM_WRITE_THREADS_KEY = "num_write_threads";
058  protected static final int DEFAULT_NUM_WRITE_THREADS = 20;
059
060  protected static final String NUM_READ_THREADS_KEY = "num_read_threads";
061  protected static final int DEFAULT_NUM_READ_THREADS = 20;
062
063  // Log is being used in IntegrationTestIngestWithEncryption, hence it is protected
064  protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestIngest.class);
065  protected IntegrationTestingUtility util;
066  protected HBaseCluster cluster;
067  protected LoadTestTool loadTool;
068
069  protected String[] LOAD_TEST_TOOL_INIT_ARGS = {
070      LoadTestTool.OPT_COLUMN_FAMILIES,
071      LoadTestTool.OPT_COMPRESSION,
072      HFileTestUtil.OPT_DATA_BLOCK_ENCODING,
073      LoadTestTool.OPT_INMEMORY,
074      LoadTestTool.OPT_ENCRYPTION,
075      LoadTestTool.OPT_NUM_REGIONS_PER_SERVER,
076      LoadTestTool.OPT_REGION_REPLICATION,
077  };
078
079  @Override
080  public void setUpCluster() throws Exception {
081    util = getTestingUtil(getConf());
082    LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
083    util.initializeCluster(getMinServerCount());
084    LOG.debug("Done initializing/checking cluster");
085    cluster = util.getHBaseClusterInterface();
086    deleteTableIfNecessary();
087    loadTool = new LoadTestTool();
088    loadTool.setConf(util.getConfiguration());
089    // Initialize load test tool before we start breaking things;
090    // LoadTestTool init, even when it is a no-op, is very fragile.
091    initTable();
092  }
093
094  protected int getMinServerCount() {
095    return SERVER_COUNT;
096  }
097
098  protected void initTable() throws IOException {
099    int ret = loadTool.run(getArgsForLoadTestToolInitTable());
100    Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
101  }
102
103  @Override
104  public int runTestFromCommandLine() throws Exception {
105    internalRunIngestTest(DEFAULT_RUN_TIME);
106    return 0;
107  }
108
109  @Test
110  public void testIngest() throws Exception {
111    runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, 20);
112  }
113
114  protected void internalRunIngestTest(long runTime) throws Exception {
115    String clazz = this.getClass().getSimpleName();
116    long numKeysPerServer = conf.getLong(String.format("%s.%s", clazz, NUM_KEYS_PER_SERVER_KEY),
117      DEFAULT_NUM_KEYS_PER_SERVER);
118    int numWriteThreads = conf.getInt(
119      String.format("%s.%s", clazz, NUM_WRITE_THREADS_KEY), DEFAULT_NUM_WRITE_THREADS);
120    int numReadThreads = conf.getInt(
121      String.format("%s.%s", clazz, NUM_READ_THREADS_KEY), DEFAULT_NUM_READ_THREADS);
122    runIngestTest(runTime, numKeysPerServer, 10, 1024, numWriteThreads, numReadThreads);
123  }
124
125  @Override
126  public TableName getTablename() {
127    String clazz = this.getClass().getSimpleName();
128    return TableName.valueOf(
129      conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_TABLE_NAME), clazz));
130  }
131
132  @Override
133  protected Set<String> getColumnFamilies() {
134    Set<String> families = Sets.newHashSet();
135    String clazz = this.getClass().getSimpleName();
136    // parse conf for getting the column famly names because LTT is not initialized yet.
137    String familiesString = getConf().get(
138      String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES));
139    if (familiesString == null) {
140      for (byte[] family : HFileTestUtil.DEFAULT_COLUMN_FAMILIES) {
141        families.add(Bytes.toString(family));
142      }
143    } else {
144       for (String family : familiesString.split(",")) {
145         families.add(family);
146       }
147    }
148
149    return families;
150  }
151
152  private void deleteTableIfNecessary() throws IOException {
153    if (util.getAdmin().tableExists(getTablename())) {
154      util.deleteTable(getTablename());
155    }
156  }
157
158  protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
159      int recordSize, int writeThreads, int readThreads) throws Exception {
160
161    LOG.info("Running ingest");
162    LOG.info("Cluster size:" + util.getHBaseClusterInterface()
163      .getClusterMetrics().getLiveServerMetrics().size());
164
165    long start = System.currentTimeMillis();
166    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
167    long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
168    long startKey = 0;
169
170    long numKeys = getNumKeys(keysPerServerPerIter);
171    while (System.currentTimeMillis() - start < 0.9 * runtime) {
172      LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
173          ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
174
175      int ret = -1;
176      ret = loadTool.run(getArgsForLoadTestTool("-write",
177          String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
178      if (0 != ret) {
179        String errorMsg = "Load failed with error code " + ret;
180        LOG.error(errorMsg);
181        Assert.fail(errorMsg);
182      }
183
184      ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d:1", writeThreads),
185          startKey, numKeys));
186      if (0 != ret) {
187        String errorMsg = "Update failed with error code " + ret;
188        LOG.error(errorMsg);
189        Assert.fail(errorMsg);
190      }
191
192      ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
193        , startKey, numKeys));
194      if (0 != ret) {
195        String errorMsg = "Verification failed with error code " + ret;
196        LOG.error(errorMsg + " Rerunning verification after 1 minute for debugging");
197        Threads.sleep(1000 * 60);
198        ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
199            , startKey, numKeys));
200        if (0 != ret) {
201          LOG.error("Rerun of Verification failed with error code " + ret);
202        }
203        Assert.fail(errorMsg);
204      }
205      startKey += numKeys;
206    }
207  }
208
209  protected String[] getArgsForLoadTestToolInitTable() {
210    List<String> args = new ArrayList<>();
211    args.add("-tn");
212    args.add(getTablename().getNameAsString());
213    // pass all remaining args from conf with keys <test class name>.<load test tool arg>
214    String clazz = this.getClass().getSimpleName();
215    for (String arg : LOAD_TEST_TOOL_INIT_ARGS) {
216      String val = conf.get(String.format("%s.%s", clazz, arg));
217      if (val != null) {
218        args.add("-" + arg);
219        args.add(val);
220      }
221    }
222    args.add("-init_only");
223    return args.toArray(new String[args.size()]);
224  }
225
226  protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
227      long numKeys) {
228    List<String> args = new ArrayList<>(11);
229    args.add("-tn");
230    args.add(getTablename().getNameAsString());
231    args.add("-families");
232    args.add(getColumnFamiliesAsString());
233    args.add(mode);
234    args.add(modeSpecificArg);
235    args.add("-start_key");
236    args.add(String.valueOf(startKey));
237    args.add("-num_keys");
238    args.add(String.valueOf(numKeys));
239    args.add("-skip_init");
240
241    return args.toArray(new String[args.size()]);
242  }
243
244  private String getColumnFamiliesAsString() {
245    return StringUtils.join(",", getColumnFamilies());
246  }
247
248  /** Estimates a data size based on the cluster size */
249  protected long getNumKeys(long keysPerServer)
250      throws IOException {
251    int numRegionServers = cluster.getClusterMetrics().getLiveServerMetrics().size();
252    return keysPerServer * numRegionServers;
253  }
254
255  public static void main(String[] args) throws Exception {
256    Configuration conf = HBaseConfiguration.create();
257    IntegrationTestingUtility.setUseDistributedCluster(conf);
258    int ret = ToolRunner.run(conf, new IntegrationTestIngest(), args);
259    System.exit(ret);
260  }
261}