001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Set;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.testclassification.IntegrationTests;
026import org.apache.hadoop.hbase.util.Bytes;
027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
028import org.apache.hadoop.hbase.util.HFileTestUtil;
029import org.apache.hadoop.hbase.util.LoadTestTool;
030import org.apache.hadoop.hbase.util.Threads;
031import org.apache.hadoop.util.StringUtils;
032import org.apache.hadoop.util.ToolRunner;
033import org.junit.Assert;
034import org.junit.Test;
035import org.junit.experimental.categories.Category;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
040import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
041
042/**
043 * A base class for tests that do something with the cluster while running {@link LoadTestTool} to
044 * write and verify some data.
045 */
046@Category(IntegrationTests.class)
047public class IntegrationTestIngest extends IntegrationTestBase {
048  public static final char HIPHEN = '-';
049  private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
050  protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
051  protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
052
053  /** A soft limit on how long we should run */
054  protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
055
056  protected static final String NUM_KEYS_PER_SERVER_KEY = "num_keys_per_server";
057  protected static final long DEFAULT_NUM_KEYS_PER_SERVER = 2500;
058
059  protected static final String NUM_WRITE_THREADS_KEY = "num_write_threads";
060  protected static final int DEFAULT_NUM_WRITE_THREADS = 20;
061
062  protected static final String NUM_READ_THREADS_KEY = "num_read_threads";
063  protected static final int DEFAULT_NUM_READ_THREADS = 20;
064
065  // Log is being used in IntegrationTestIngestWithEncryption, hence it is protected
066  protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestIngest.class);
067  protected HBaseClusterInterface cluster;
068  protected LoadTestTool loadTool;
069
070  protected String[] LOAD_TEST_TOOL_INIT_ARGS =
071    { LoadTestTool.OPT_COLUMN_FAMILIES, LoadTestTool.OPT_COMPRESSION,
072      HFileTestUtil.OPT_DATA_BLOCK_ENCODING, LoadTestTool.OPT_INMEMORY, LoadTestTool.OPT_ENCRYPTION,
073      LoadTestTool.OPT_NUM_REGIONS_PER_SERVER, LoadTestTool.OPT_REGION_REPLICATION, };
074
075  @Override
076  public void setUpCluster() throws Exception {
077    util = getTestingUtil(getConf());
078    LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
079    util.initializeCluster(getMinServerCount());
080    LOG.debug("Done initializing/checking cluster");
081    cluster = util.getHBaseClusterInterface();
082    deleteTableIfNecessary();
083    loadTool = new LoadTestTool();
084    loadTool.setConf(util.getConfiguration());
085    // Initialize load test tool before we start breaking things;
086    // LoadTestTool init, even when it is a no-op, is very fragile.
087    initTable();
088  }
089
090  protected int getMinServerCount() {
091    return SERVER_COUNT;
092  }
093
094  protected void initTable() throws IOException {
095    int ret = loadTool.run(getArgsForLoadTestToolInitTable());
096    Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
097  }
098
099  @Override
100  public int runTestFromCommandLine() throws Exception {
101    internalRunIngestTest(DEFAULT_RUN_TIME);
102    return 0;
103  }
104
105  @Test
106  public void testIngest() throws Exception {
107    runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, 20);
108  }
109
110  protected void internalRunIngestTest(long runTime) throws Exception {
111    String clazz = this.getClass().getSimpleName();
112    long numKeysPerServer = conf.getLong(String.format("%s.%s", clazz, NUM_KEYS_PER_SERVER_KEY),
113      DEFAULT_NUM_KEYS_PER_SERVER);
114    int numWriteThreads =
115      conf.getInt(String.format("%s.%s", clazz, NUM_WRITE_THREADS_KEY), DEFAULT_NUM_WRITE_THREADS);
116    int numReadThreads =
117      conf.getInt(String.format("%s.%s", clazz, NUM_READ_THREADS_KEY), DEFAULT_NUM_READ_THREADS);
118    runIngestTest(runTime, numKeysPerServer, 10, 1024, numWriteThreads, numReadThreads);
119  }
120
121  @Override
122  public TableName getTablename() {
123    String clazz = this.getClass().getSimpleName();
124    return TableName
125      .valueOf(conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_TABLE_NAME), clazz));
126  }
127
128  @Override
129  protected Set<String> getColumnFamilies() {
130    Set<String> families = Sets.newHashSet();
131    String clazz = this.getClass().getSimpleName();
132    // parse conf for getting the column famly names because LTT is not initialized yet.
133    String familiesString =
134      getConf().get(String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES));
135    if (familiesString == null) {
136      for (byte[] family : HFileTestUtil.DEFAULT_COLUMN_FAMILIES) {
137        families.add(Bytes.toString(family));
138      }
139    } else {
140      for (String family : Splitter.on(',').split(familiesString)) {
141        families.add(family);
142      }
143    }
144
145    return families;
146  }
147
148  private void deleteTableIfNecessary() throws IOException {
149    if (util.getAdmin().tableExists(getTablename())) {
150      util.deleteTable(getTablename());
151    }
152  }
153
154  protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
155    int recordSize, int writeThreads, int readThreads) throws Exception {
156
157    LOG.info("Running ingest");
158    LOG.info("Cluster size:"
159      + util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
160
161    long start = EnvironmentEdgeManager.currentTime();
162    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
163    long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
164    long startKey = 0;
165
166    long numKeys = getNumKeys(keysPerServerPerIter);
167    while (EnvironmentEdgeManager.currentTime() - start < 0.9 * runtime) {
168      LOG.info("Intended run time: " + (runtime / 60000) + " min, left:"
169        + ((runtime - (EnvironmentEdgeManager.currentTime() - start)) / 60000) + " min");
170
171      int ret = loadTool.run(getArgsForLoadTestTool("-write",
172        String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
173      if (0 != ret) {
174        String errorMsg = "Load failed with error code " + ret;
175        LOG.error(errorMsg);
176        Assert.fail(errorMsg);
177      }
178
179      ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d:1", writeThreads),
180        startKey, numKeys));
181      if (0 != ret) {
182        String errorMsg = "Update failed with error code " + ret;
183        LOG.error(errorMsg);
184        Assert.fail(errorMsg);
185      }
186
187      ret = loadTool.run(
188        getArgsForLoadTestTool("-read", String.format("100:%d", readThreads), startKey, numKeys));
189      if (0 != ret) {
190        String errorMsg = "Verification failed with error code " + ret;
191        LOG.error(errorMsg + " Rerunning verification after 1 minute for debugging");
192        Threads.sleep(1000 * 60);
193        ret = loadTool.run(
194          getArgsForLoadTestTool("-read", String.format("100:%d", readThreads), startKey, numKeys));
195        if (0 != ret) {
196          LOG.error("Rerun of Verification failed with error code " + ret);
197        }
198        Assert.fail(errorMsg);
199      }
200      startKey += numKeys;
201    }
202  }
203
204  protected String[] getArgsForLoadTestToolInitTable() {
205    List<String> args = new ArrayList<>();
206    args.add("-tn");
207    args.add(getTablename().getNameAsString());
208    // pass all remaining args from conf with keys <test class name>.<load test tool arg>
209    String clazz = this.getClass().getSimpleName();
210    for (String arg : LOAD_TEST_TOOL_INIT_ARGS) {
211      String val = conf.get(String.format("%s.%s", clazz, arg));
212      if (val != null) {
213        args.add("-" + arg);
214        args.add(val);
215      }
216    }
217    args.add("-init_only");
218    return args.toArray(new String[args.size()]);
219  }
220
221  protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
222    long numKeys) {
223    List<String> args = new ArrayList<>(11);
224    args.add("-tn");
225    args.add(getTablename().getNameAsString());
226    args.add("-families");
227    args.add(getColumnFamiliesAsString());
228    args.add(mode);
229    args.add(modeSpecificArg);
230    args.add("-start_key");
231    args.add(String.valueOf(startKey));
232    args.add("-num_keys");
233    args.add(String.valueOf(numKeys));
234    args.add("-skip_init");
235
236    return args.toArray(new String[args.size()]);
237  }
238
239  private String getColumnFamiliesAsString() {
240    return StringUtils.join(",", getColumnFamilies());
241  }
242
243  /** Estimates a data size based on the cluster size */
244  protected long getNumKeys(long keysPerServer) throws IOException {
245    int numRegionServers = cluster.getClusterMetrics().getLiveServerMetrics().size();
246    return keysPerServer * numRegionServers;
247  }
248
249  public static void main(String[] args) throws Exception {
250    Configuration conf = HBaseConfiguration.create();
251    IntegrationTestingUtility.setUseDistributedCluster(conf);
252    int ret = ToolRunner.run(conf, new IntegrationTestIngest(), args);
253    System.exit(ret);
254  }
255}