001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase;
020
021import java.io.IOException;
022import java.util.Locale;
023import java.util.Set;
024import org.apache.commons.lang3.StringUtils;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
027import org.apache.hadoop.hbase.regionserver.HStore;
028import org.apache.hadoop.hbase.regionserver.StoreEngine;
029import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
030import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
031import org.apache.hadoop.hbase.util.AbstractHBaseTool;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
034import org.apache.hadoop.hbase.util.MultiThreadedAction;
035import org.apache.hadoop.hbase.util.MultiThreadedReader;
036import org.apache.hadoop.hbase.util.MultiThreadedWriter;
037import org.apache.hadoop.hbase.util.RegionSplitter;
038import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.junit.Assert;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
045
046/**
047 * A perf test which does large data ingestion using stripe compactions and regular compactions.
048 */
049@InterfaceAudience.Private
050public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool {
051  private static final Logger LOG =
052      LoggerFactory.getLogger(StripeCompactionsPerformanceEvaluation.class);
053
054  private static final TableName TABLE_NAME =
055    TableName.valueOf(StripeCompactionsPerformanceEvaluation.class.getSimpleName());
056  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF");
057  private static final int MIN_NUM_SERVERS = 1;
058
059  // Option names.
060  private static final String DATAGEN_KEY = "datagen";
061  private static final String ITERATIONS_KEY = "iters";
062  private static final String PRELOAD_COUNT_KEY = "pwk";
063  private static final String WRITE_COUNT_KEY = "wk";
064  private static final String WRITE_THREADS_KEY = "wt";
065  private static final String READ_THREADS_KEY = "rt";
066  private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes";
067  private static final String SPLIT_SIZE_KEY = "splitsize";
068  private static final String SPLIT_PARTS_KEY = "splitparts";
069  private static final String VALUE_SIZE_KEY = "valsize";
070  private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards";
071
072  // Option values.
073  private LoadTestDataGenerator dataGen;
074  private int iterationCount;
075  private long preloadKeys;
076  private long writeKeys;
077  private int writeThreads;
078  private int readThreads;
079  private Long initialStripeCount;
080  private Long splitSize;
081  private Long splitParts;
082
083  private static final String VALUE_SIZE_DEFAULT = "512:4096";
084
085  protected IntegrationTestingUtility util = new IntegrationTestingUtility();
086
087  @Override
088  protected void addOptions() {
089    addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)");
090    addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many"
091        + " sequences. The number of such shards per server is specified (default is 1).");
092    addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare");
093    addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server");
094    addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server");
095    addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing");
096    addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading");
097    addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially");
098    addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes");
099    addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits");
100    addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;"
101        + " default " + VALUE_SIZE_DEFAULT);
102  }
103
104  @Override
105  protected void processOptions(CommandLine cmd) {
106    int minValueSize = 0, maxValueSize = 0;
107    String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT);
108    if (valueSize.contains(":")) {
109      String[] valueSizes = valueSize.split(":");
110      if (valueSize.length() != 2) throw new RuntimeException("Invalid value size: " + valueSize);
111      minValueSize = Integer.parseInt(valueSizes[0]);
112      maxValueSize = Integer.parseInt(valueSizes[1]);
113    } else {
114      minValueSize = maxValueSize = Integer.parseInt(valueSize);
115    }
116    String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase(Locale.ROOT);
117    if ("default".equals(datagen)) {
118      dataGen = new MultiThreadedAction.DefaultDataGenerator(
119          minValueSize, maxValueSize, 1, 1, new byte[][] { COLUMN_FAMILY });
120    } else if ("sequential".equals(datagen)) {
121      int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1"));
122      dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards);
123    } else {
124      throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen);
125    }
126    iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1"));
127    preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000"));
128    writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000"));
129    writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10"));
130    readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20"));
131    initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY);
132    splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY);
133    splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY);
134  }
135
136  private Long getLongOrNull(CommandLine cmd, String option) {
137    if (!cmd.hasOption(option)) return null;
138    return Long.parseLong(cmd.getOptionValue(option));
139  }
140
141  @Override
142  public Configuration getConf() {
143    Configuration c = super.getConf();
144    if (c == null && util != null) {
145      conf = util.getConfiguration();
146      c = conf;
147    }
148    return c;
149  }
150
151  @Override
152  protected int doWork() throws Exception {
153    setUp();
154    try {
155      boolean isStripe = true;
156      for (int i = 0; i < iterationCount * 2; ++i) {
157        createTable(isStripe);
158        runOneTest((isStripe ? "Stripe" : "Default") + i, conf);
159        isStripe = !isStripe;
160      }
161      return 0;
162    } finally {
163      tearDown();
164    }
165  }
166
167
168  private void setUp() throws Exception {
169    this.util = new IntegrationTestingUtility();
170    LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers");
171    util.initializeCluster(MIN_NUM_SERVERS);
172    LOG.debug("Done initializing/checking cluster");
173  }
174
175  protected void deleteTable() throws Exception {
176    if (util.getAdmin().tableExists(TABLE_NAME)) {
177      LOG.info("Deleting table");
178      if (!util.getAdmin().isTableDisabled(TABLE_NAME)) {
179        util.getAdmin().disableTable(TABLE_NAME);
180      }
181      util.getAdmin().deleteTable(TABLE_NAME);
182      LOG.info("Deleted table");
183    }
184  }
185
186  private void createTable(boolean isStripe) throws Exception {
187    createTable(createHtd(isStripe));
188  }
189
190  private void tearDown() throws Exception {
191    deleteTable();
192    LOG.info("Restoring the cluster");
193    util.restoreCluster();
194    LOG.info("Done restoring the cluster");
195  }
196
197  private void runOneTest(String description, Configuration conf) throws Exception {
198    int numServers = util.getHBaseClusterInterface()
199      .getClusterMetrics().getLiveServerMetrics().size();
200    long startKey = preloadKeys * numServers;
201    long endKey = startKey + writeKeys * numServers;
202    status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
203        description, numServers, startKey, endKey));
204
205    if (preloadKeys > 0) {
206      MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
207      long time = System.currentTimeMillis();
208      preloader.start(0, startKey, writeThreads);
209      preloader.waitForFinish();
210      if (preloader.getNumWriteFailures() > 0) {
211        throw new IOException("Preload failed");
212      }
213      int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary
214      status(description + " preload took " + (System.currentTimeMillis()-time)/1000
215          + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize");
216      Thread.sleep(waitTime);
217    }
218
219    MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
220    MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100);
221    // reader.getMetrics().enable();
222    reader.linkToWriter(writer);
223
224    long testStartTime = System.currentTimeMillis();
225    writer.start(startKey, endKey, writeThreads);
226    reader.start(startKey, endKey, readThreads);
227    writer.waitForFinish();
228    reader.waitForFinish();
229    // reader.waitForVerification(300000);
230    // reader.abortAndWaitForFinish();
231    status("Readers and writers stopped for test " + description);
232
233    boolean success = writer.getNumWriteFailures() == 0;
234    if (!success) {
235      LOG.error("Write failed");
236    } else {
237      success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
238      if (!success) {
239        LOG.error("Read failed");
240      }
241    }
242
243    // Dump perf regardless of the result.
244    /*StringBuilder perfDump = new StringBuilder();
245    for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) {
246      perfDump.append(String.format(
247          "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond()));
248    }
249    if (dumpTimePerf) {
250      Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries();
251      while (timePerf.hasNext()) {
252        Triple<Long, Double, Long> pt = timePerf.next();
253        perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n",
254            description, pt.getFirst(), pt.getThird(), pt.getSecond()));
255      }
256    }
257    LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/
258    status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec");
259    Assert.assertTrue(success);
260  }
261
262  private static void status(String s) {
263    LOG.info("STATUS " + s);
264    System.out.println(s);
265  }
266
267  private HTableDescriptor createHtd(boolean isStripe) throws Exception {
268    HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
269    htd.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
270    String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName();
271    htd.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy);
272    if (isStripe) {
273      htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
274      if (initialStripeCount != null) {
275        htd.setConfiguration(
276            StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString());
277        htd.setConfiguration(
278            HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount));
279      } else {
280        htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "500");
281      }
282      if (splitSize != null) {
283        htd.setConfiguration(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString());
284      }
285      if (splitParts != null) {
286        htd.setConfiguration(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString());
287      }
288    } else {
289      htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "10"); // default
290    }
291    return htd;
292  }
293
294  protected void createTable(HTableDescriptor htd) throws Exception {
295    deleteTable();
296    if (util.getHBaseClusterInterface() instanceof MiniHBaseCluster) {
297      LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low.");
298      htd.setConfiguration(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576");
299    }
300    byte[][] splits = new RegionSplitter.HexStringSplit().split(
301        util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
302    util.getAdmin().createTable(htd, splits);
303  }
304
305  public static class SeqShardedDataGenerator extends LoadTestDataGenerator {
306    private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") };
307    private static final int PAD_TO = 10;
308    private static final int PREFIX_PAD_TO = 7;
309
310    private final int numPartitions;
311
312    public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) {
313      super(minValueSize, maxValueSize);
314      this.numPartitions = numPartitions;
315    }
316
317    @Override
318    public byte[] getDeterministicUniqueKey(long keyBase) {
319      String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0");
320      return Bytes.toBytes(getPrefix(keyBase) + num);
321    }
322
323    private String getPrefix(long i) {
324      return StringUtils.leftPad(String.valueOf((int)(i % numPartitions)), PREFIX_PAD_TO, "0");
325    }
326
327    @Override
328    public byte[][] getColumnFamilies() {
329      return new byte[][] { COLUMN_FAMILY };
330    }
331
332    @Override
333    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
334      return COLUMN_NAMES;
335    }
336
337    @Override
338    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
339      return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
340    }
341
342    @Override
343    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
344      return LoadTestKVGenerator.verify(value, rowKey, cf, column);
345    }
346
347    @Override
348    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
349      return true;
350    }
351  };
352}