001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Set;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.testclassification.IntegrationTests;
026import org.apache.hadoop.hbase.util.Bytes;
027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
028import org.apache.hadoop.hbase.util.HFileTestUtil;
029import org.apache.hadoop.hbase.util.LoadTestTool;
030import org.apache.hadoop.hbase.util.Threads;
031import org.apache.hadoop.util.StringUtils;
032import org.apache.hadoop.util.ToolRunner;
033import org.junit.Assert;
034import org.junit.Test;
035import org.junit.experimental.categories.Category;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
040import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
041
042/**
043 * A base class for tests that do something with the cluster while running {@link LoadTestTool} to
044 * write and verify some data.
045 */
046@Category(IntegrationTests.class)
047public class IntegrationTestIngest extends IntegrationTestBase {
048  public static final char HIPHEN = '-';
049  private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
050  protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
051  protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
052
053  /** A soft limit on how long we should run */
054  protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
055
056  protected static final String NUM_KEYS_PER_SERVER_KEY = "num_keys_per_server";
057  protected static final long DEFAULT_NUM_KEYS_PER_SERVER = 2500;
058
059  protected static final String NUM_WRITE_THREADS_KEY = "num_write_threads";
060  protected static final int DEFAULT_NUM_WRITE_THREADS = 20;
061
062  protected static final String NUM_READ_THREADS_KEY = "num_read_threads";
063  protected static final int DEFAULT_NUM_READ_THREADS = 20;
064
065  // Log is being used in IntegrationTestIngestWithEncryption, hence it is protected
066  protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestIngest.class);
067
068  protected IntegrationTestingUtility util;
069  protected HBaseCluster cluster;
070  protected LoadTestTool loadTool;
071
072  protected String[] LOAD_TEST_TOOL_INIT_ARGS =
073    { LoadTestTool.OPT_COLUMN_FAMILIES, LoadTestTool.OPT_COMPRESSION,
074      HFileTestUtil.OPT_DATA_BLOCK_ENCODING, LoadTestTool.OPT_INMEMORY, LoadTestTool.OPT_ENCRYPTION,
075      LoadTestTool.OPT_NUM_REGIONS_PER_SERVER, LoadTestTool.OPT_REGION_REPLICATION, };
076
077  @Override
078  public void setUpCluster() throws Exception {
079    util = getTestingUtil(getConf());
080    LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
081    util.initializeCluster(getMinServerCount());
082    LOG.debug("Done initializing/checking cluster");
083    cluster = util.getHBaseClusterInterface();
084    deleteTableIfNecessary();
085    loadTool = new LoadTestTool();
086    loadTool.setConf(util.getConfiguration());
087    // Initialize load test tool before we start breaking things;
088    // LoadTestTool init, even when it is a no-op, is very fragile.
089    initTable();
090  }
091
092  protected int getMinServerCount() {
093    return SERVER_COUNT;
094  }
095
096  protected void initTable() throws IOException {
097    int ret = loadTool.run(getArgsForLoadTestToolInitTable());
098    Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
099  }
100
101  @Override
102  public int runTestFromCommandLine() throws Exception {
103    internalRunIngestTest(DEFAULT_RUN_TIME);
104    return 0;
105  }
106
107  @Test
108  public void testIngest() throws Exception {
109    runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, 20);
110  }
111
112  protected void internalRunIngestTest(long runTime) throws Exception {
113    String clazz = this.getClass().getSimpleName();
114    long numKeysPerServer = conf.getLong(String.format("%s.%s", clazz, NUM_KEYS_PER_SERVER_KEY),
115      DEFAULT_NUM_KEYS_PER_SERVER);
116    int numWriteThreads =
117      conf.getInt(String.format("%s.%s", clazz, NUM_WRITE_THREADS_KEY), DEFAULT_NUM_WRITE_THREADS);
118    int numReadThreads =
119      conf.getInt(String.format("%s.%s", clazz, NUM_READ_THREADS_KEY), DEFAULT_NUM_READ_THREADS);
120    runIngestTest(runTime, numKeysPerServer, 10, 1024, numWriteThreads, numReadThreads);
121  }
122
123  @Override
124  public TableName getTablename() {
125    String clazz = this.getClass().getSimpleName();
126    return TableName
127      .valueOf(conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_TABLE_NAME), clazz));
128  }
129
130  @Override
131  protected Set<String> getColumnFamilies() {
132    Set<String> families = Sets.newHashSet();
133    String clazz = this.getClass().getSimpleName();
134    // parse conf for getting the column famly names because LTT is not initialized yet.
135    String familiesString =
136      getConf().get(String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES));
137    if (familiesString == null) {
138      for (byte[] family : HFileTestUtil.DEFAULT_COLUMN_FAMILIES) {
139        families.add(Bytes.toString(family));
140      }
141    } else {
142      for (String family : Splitter.on(',').split(familiesString)) {
143        families.add(family);
144      }
145    }
146
147    return families;
148  }
149
150  private void deleteTableIfNecessary() throws IOException {
151    if (util.getAdmin().tableExists(getTablename())) {
152      util.deleteTable(getTablename());
153    }
154  }
155
156  protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
157    int recordSize, int writeThreads, int readThreads) throws Exception {
158
159    LOG.info("Running ingest");
160    LOG.info("Cluster size:"
161      + util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
162
163    long start = EnvironmentEdgeManager.currentTime();
164    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
165    long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
166    long startKey = 0;
167
168    long numKeys = getNumKeys(keysPerServerPerIter);
169    while (EnvironmentEdgeManager.currentTime() - start < 0.9 * runtime) {
170      LOG.info("Intended run time: " + (runtime / 60000) + " min, left:"
171        + ((runtime - (EnvironmentEdgeManager.currentTime() - start)) / 60000) + " min");
172
173      int ret = loadTool.run(getArgsForLoadTestTool("-write",
174        String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
175      if (0 != ret) {
176        String errorMsg = "Load failed with error code " + ret;
177        LOG.error(errorMsg);
178        Assert.fail(errorMsg);
179      }
180
181      ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d:1", writeThreads),
182        startKey, numKeys));
183      if (0 != ret) {
184        String errorMsg = "Update failed with error code " + ret;
185        LOG.error(errorMsg);
186        Assert.fail(errorMsg);
187      }
188
189      ret = loadTool.run(
190        getArgsForLoadTestTool("-read", String.format("100:%d", readThreads), startKey, numKeys));
191      if (0 != ret) {
192        String errorMsg = "Verification failed with error code " + ret;
193        LOG.error(errorMsg + " Rerunning verification after 1 minute for debugging");
194        Threads.sleep(1000 * 60);
195        ret = loadTool.run(
196          getArgsForLoadTestTool("-read", String.format("100:%d", readThreads), startKey, numKeys));
197        if (0 != ret) {
198          LOG.error("Rerun of Verification failed with error code " + ret);
199        }
200        Assert.fail(errorMsg);
201      }
202      startKey += numKeys;
203    }
204  }
205
206  protected String[] getArgsForLoadTestToolInitTable() {
207    List<String> args = new ArrayList<>();
208    args.add("-tn");
209    args.add(getTablename().getNameAsString());
210    // pass all remaining args from conf with keys <test class name>.<load test tool arg>
211    String clazz = this.getClass().getSimpleName();
212    for (String arg : LOAD_TEST_TOOL_INIT_ARGS) {
213      String val = conf.get(String.format("%s.%s", clazz, arg));
214      if (val != null) {
215        args.add("-" + arg);
216        args.add(val);
217      }
218    }
219    args.add("-init_only");
220    return args.toArray(new String[args.size()]);
221  }
222
223  protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
224    long numKeys) {
225    List<String> args = new ArrayList<>(11);
226    args.add("-tn");
227    args.add(getTablename().getNameAsString());
228    args.add("-families");
229    args.add(getColumnFamiliesAsString());
230    args.add(mode);
231    args.add(modeSpecificArg);
232    args.add("-start_key");
233    args.add(String.valueOf(startKey));
234    args.add("-num_keys");
235    args.add(String.valueOf(numKeys));
236    args.add("-skip_init");
237
238    return args.toArray(new String[args.size()]);
239  }
240
241  private String getColumnFamiliesAsString() {
242    return StringUtils.join(",", getColumnFamilies());
243  }
244
245  /** Estimates a data size based on the cluster size */
246  protected long getNumKeys(long keysPerServer) throws IOException {
247    int numRegionServers = cluster.getClusterMetrics().getLiveServerMetrics().size();
248    return keysPerServer * numRegionServers;
249  }
250
251  public static void main(String[] args) throws Exception {
252    Configuration conf = HBaseConfiguration.create();
253    IntegrationTestingUtility.setUseDistributedCluster(conf);
254    int ret = ToolRunner.run(conf, new IntegrationTestIngest(), args);
255    System.exit(ret);
256  }
257}