001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Set;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.testclassification.IntegrationTests;
027import org.apache.hadoop.hbase.util.Bytes;
028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
029import org.apache.hadoop.hbase.util.HFileTestUtil;
030import org.apache.hadoop.hbase.util.LoadTestTool;
031import org.apache.hadoop.hbase.util.Threads;
032import org.apache.hadoop.util.StringUtils;
033import org.apache.hadoop.util.ToolRunner;
034import org.junit.Assert;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
040
041/**
042 * A base class for tests that do something with the cluster while running
043 * {@link LoadTestTool} to write and verify some data.
044 */
045@Category(IntegrationTests.class)
046public class IntegrationTestIngest extends IntegrationTestBase {
047  public static final char HIPHEN = '-';
048  private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
049  protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
050  protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
051
052  /** A soft limit on how long we should run */
053  protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
054
055  protected static final String NUM_KEYS_PER_SERVER_KEY = "num_keys_per_server";
056  protected static final long DEFAULT_NUM_KEYS_PER_SERVER = 2500;
057
058  protected static final String NUM_WRITE_THREADS_KEY = "num_write_threads";
059  protected static final int DEFAULT_NUM_WRITE_THREADS = 20;
060
061  protected static final String NUM_READ_THREADS_KEY = "num_read_threads";
062  protected static final int DEFAULT_NUM_READ_THREADS = 20;
063
064  // Log is being used in IntegrationTestIngestWithEncryption, hence it is protected
065  protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestIngest.class);
066  protected IntegrationTestingUtility util;
067  protected HBaseClusterInterface cluster;
068  protected LoadTestTool loadTool;
069
070  protected String[] LOAD_TEST_TOOL_INIT_ARGS = {
071      LoadTestTool.OPT_COLUMN_FAMILIES,
072      LoadTestTool.OPT_COMPRESSION,
073      HFileTestUtil.OPT_DATA_BLOCK_ENCODING,
074      LoadTestTool.OPT_INMEMORY,
075      LoadTestTool.OPT_ENCRYPTION,
076      LoadTestTool.OPT_NUM_REGIONS_PER_SERVER,
077      LoadTestTool.OPT_REGION_REPLICATION,
078  };
079
080  @Override
081  public void setUpCluster() throws Exception {
082    util = getTestingUtil(getConf());
083    LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
084    util.initializeCluster(getMinServerCount());
085    LOG.debug("Done initializing/checking cluster");
086    cluster = util.getHBaseClusterInterface();
087    deleteTableIfNecessary();
088    loadTool = new LoadTestTool();
089    loadTool.setConf(util.getConfiguration());
090    // Initialize load test tool before we start breaking things;
091    // LoadTestTool init, even when it is a no-op, is very fragile.
092    initTable();
093  }
094
095  protected int getMinServerCount() {
096    return SERVER_COUNT;
097  }
098
099  protected void initTable() throws IOException {
100    int ret = loadTool.run(getArgsForLoadTestToolInitTable());
101    Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
102  }
103
104  @Override
105  public int runTestFromCommandLine() throws Exception {
106    internalRunIngestTest(DEFAULT_RUN_TIME);
107    return 0;
108  }
109
110  @Test
111  public void testIngest() throws Exception {
112    runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, 20);
113  }
114
115  protected void internalRunIngestTest(long runTime) throws Exception {
116    String clazz = this.getClass().getSimpleName();
117    long numKeysPerServer = conf.getLong(String.format("%s.%s", clazz, NUM_KEYS_PER_SERVER_KEY),
118      DEFAULT_NUM_KEYS_PER_SERVER);
119    int numWriteThreads = conf.getInt(
120      String.format("%s.%s", clazz, NUM_WRITE_THREADS_KEY), DEFAULT_NUM_WRITE_THREADS);
121    int numReadThreads = conf.getInt(
122      String.format("%s.%s", clazz, NUM_READ_THREADS_KEY), DEFAULT_NUM_READ_THREADS);
123    runIngestTest(runTime, numKeysPerServer, 10, 1024, numWriteThreads, numReadThreads);
124  }
125
126  @Override
127  public TableName getTablename() {
128    String clazz = this.getClass().getSimpleName();
129    return TableName.valueOf(
130      conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_TABLE_NAME), clazz));
131  }
132
133  @Override
134  protected Set<String> getColumnFamilies() {
135    Set<String> families = Sets.newHashSet();
136    String clazz = this.getClass().getSimpleName();
137    // parse conf for getting the column famly names because LTT is not initialized yet.
138    String familiesString = getConf().get(
139      String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES));
140    if (familiesString == null) {
141      for (byte[] family : HFileTestUtil.DEFAULT_COLUMN_FAMILIES) {
142        families.add(Bytes.toString(family));
143      }
144    } else {
145       for (String family : familiesString.split(",")) {
146         families.add(family);
147       }
148    }
149
150    return families;
151  }
152
153  private void deleteTableIfNecessary() throws IOException {
154    if (util.getAdmin().tableExists(getTablename())) {
155      util.deleteTable(getTablename());
156    }
157  }
158
159  protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
160      int recordSize, int writeThreads, int readThreads) throws Exception {
161
162    LOG.info("Running ingest");
163    LOG.info("Cluster size:" + util.getHBaseClusterInterface()
164      .getClusterMetrics().getLiveServerMetrics().size());
165
166    long start = EnvironmentEdgeManager.currentTime();
167    String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
168    long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
169    long startKey = 0;
170
171    long numKeys = getNumKeys(keysPerServerPerIter);
172    while (EnvironmentEdgeManager.currentTime() - start < 0.9 * runtime) {
173      LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
174          ((runtime - (EnvironmentEdgeManager.currentTime() - start))/60000) + " min");
175
176      int ret = -1;
177      ret = loadTool.run(getArgsForLoadTestTool("-write",
178          String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
179      if (0 != ret) {
180        String errorMsg = "Load failed with error code " + ret;
181        LOG.error(errorMsg);
182        Assert.fail(errorMsg);
183      }
184
185      ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d:1", writeThreads),
186          startKey, numKeys));
187      if (0 != ret) {
188        String errorMsg = "Update failed with error code " + ret;
189        LOG.error(errorMsg);
190        Assert.fail(errorMsg);
191      }
192
193      ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
194        , startKey, numKeys));
195      if (0 != ret) {
196        String errorMsg = "Verification failed with error code " + ret;
197        LOG.error(errorMsg + " Rerunning verification after 1 minute for debugging");
198        Threads.sleep(1000 * 60);
199        ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
200            , startKey, numKeys));
201        if (0 != ret) {
202          LOG.error("Rerun of Verification failed with error code " + ret);
203        }
204        Assert.fail(errorMsg);
205      }
206      startKey += numKeys;
207    }
208  }
209
210  protected String[] getArgsForLoadTestToolInitTable() {
211    List<String> args = new ArrayList<>();
212    args.add("-tn");
213    args.add(getTablename().getNameAsString());
214    // pass all remaining args from conf with keys <test class name>.<load test tool arg>
215    String clazz = this.getClass().getSimpleName();
216    for (String arg : LOAD_TEST_TOOL_INIT_ARGS) {
217      String val = conf.get(String.format("%s.%s", clazz, arg));
218      if (val != null) {
219        args.add("-" + arg);
220        args.add(val);
221      }
222    }
223    args.add("-init_only");
224    return args.toArray(new String[args.size()]);
225  }
226
227  protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
228      long numKeys) {
229    List<String> args = new ArrayList<>(11);
230    args.add("-tn");
231    args.add(getTablename().getNameAsString());
232    args.add("-families");
233    args.add(getColumnFamiliesAsString());
234    args.add(mode);
235    args.add(modeSpecificArg);
236    args.add("-start_key");
237    args.add(String.valueOf(startKey));
238    args.add("-num_keys");
239    args.add(String.valueOf(numKeys));
240    args.add("-skip_init");
241
242    return args.toArray(new String[args.size()]);
243  }
244
245  private String getColumnFamiliesAsString() {
246    return StringUtils.join(",", getColumnFamilies());
247  }
248
249  /** Estimates a data size based on the cluster size */
250  protected long getNumKeys(long keysPerServer)
251      throws IOException {
252    int numRegionServers = cluster.getClusterMetrics().getLiveServerMetrics().size();
253    return keysPerServer * numRegionServers;
254  }
255
256  public static void main(String[] args) throws Exception {
257    Configuration conf = HBaseConfiguration.create();
258    IntegrationTestingUtility.setUseDistributedCluster(conf);
259    int ret = ToolRunner.run(conf, new IntegrationTestIngest(), args);
260    System.exit(ret);
261  }
262}