001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.Locale;
023import java.util.Set;
024import org.apache.commons.lang3.StringUtils;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
027import org.apache.hadoop.hbase.regionserver.HStore;
028import org.apache.hadoop.hbase.regionserver.StoreEngine;
029import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
030import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
031import org.apache.hadoop.hbase.util.AbstractHBaseTool;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
035import org.apache.hadoop.hbase.util.MultiThreadedAction;
036import org.apache.hadoop.hbase.util.MultiThreadedReader;
037import org.apache.hadoop.hbase.util.MultiThreadedWriter;
038import org.apache.hadoop.hbase.util.RegionSplitter;
039import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.junit.Assert;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
047import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
048
049/**
050 * A perf test which does large data ingestion using stripe compactions and regular compactions.
051 */
052@InterfaceAudience.Private
053public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool {
054  private static final Logger LOG =
055    LoggerFactory.getLogger(StripeCompactionsPerformanceEvaluation.class);
056
057  private static final TableName TABLE_NAME =
058    TableName.valueOf(StripeCompactionsPerformanceEvaluation.class.getSimpleName());
059  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF");
060  private static final int MIN_NUM_SERVERS = 1;
061
062  // Option names.
063  private static final String DATAGEN_KEY = "datagen";
064  private static final String ITERATIONS_KEY = "iters";
065  private static final String PRELOAD_COUNT_KEY = "pwk";
066  private static final String WRITE_COUNT_KEY = "wk";
067  private static final String WRITE_THREADS_KEY = "wt";
068  private static final String READ_THREADS_KEY = "rt";
069  private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes";
070  private static final String SPLIT_SIZE_KEY = "splitsize";
071  private static final String SPLIT_PARTS_KEY = "splitparts";
072  private static final String VALUE_SIZE_KEY = "valsize";
073  private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards";
074
075  // Option values.
076  private LoadTestDataGenerator dataGen;
077  private int iterationCount;
078  private long preloadKeys;
079  private long writeKeys;
080  private int writeThreads;
081  private int readThreads;
082  private Long initialStripeCount;
083  private Long splitSize;
084  private Long splitParts;
085
086  private static final String VALUE_SIZE_DEFAULT = "512:4096";
087
088  protected IntegrationTestingUtility util = new IntegrationTestingUtility();
089
090  @Override
091  protected void addOptions() {
092    addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)");
093    addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many"
094      + " sequences. The number of such shards per server is specified (default is 1).");
095    addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare");
096    addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server");
097    addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server");
098    addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing");
099    addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading");
100    addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially");
101    addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes");
102    addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits");
103    addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;"
104      + " default " + VALUE_SIZE_DEFAULT);
105  }
106
107  @Override
108  protected void processOptions(CommandLine cmd) {
109    int minValueSize = 0, maxValueSize = 0;
110    String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT);
111    if (valueSize.contains(":")) {
112      List<String> valueSizes = Splitter.on(':').splitToList(valueSize);
113      if (valueSizes.size() != 2) {
114        throw new RuntimeException("Invalid value size: " + valueSize);
115      }
116      minValueSize = Integer.parseInt(Iterables.get(valueSizes, 0));
117      maxValueSize = Integer.parseInt(Iterables.get(valueSizes, 1));
118    } else {
119      minValueSize = maxValueSize = Integer.parseInt(valueSize);
120    }
121    String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase(Locale.ROOT);
122    if ("default".equals(datagen)) {
123      dataGen = new MultiThreadedAction.DefaultDataGenerator(minValueSize, maxValueSize, 1, 1,
124        new byte[][] { COLUMN_FAMILY });
125    } else if ("sequential".equals(datagen)) {
126      int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1"));
127      dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards);
128    } else {
129      throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen);
130    }
131    iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1"));
132    preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000"));
133    writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000"));
134    writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10"));
135    readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20"));
136    initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY);
137    splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY);
138    splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY);
139  }
140
141  private Long getLongOrNull(CommandLine cmd, String option) {
142    if (!cmd.hasOption(option)) return null;
143    return Long.parseLong(cmd.getOptionValue(option));
144  }
145
146  @Override
147  public Configuration getConf() {
148    Configuration c = super.getConf();
149    if (c == null && util != null) {
150      conf = util.getConfiguration();
151      c = conf;
152    }
153    return c;
154  }
155
156  @Override
157  protected int doWork() throws Exception {
158    setUp();
159    try {
160      boolean isStripe = true;
161      for (int i = 0; i < iterationCount * 2; ++i) {
162        createTable(isStripe);
163        runOneTest((isStripe ? "Stripe" : "Default") + i, conf);
164        isStripe = !isStripe;
165      }
166      return 0;
167    } finally {
168      tearDown();
169    }
170  }
171
172  private void setUp() throws Exception {
173    this.util = new IntegrationTestingUtility();
174    LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers");
175    util.initializeCluster(MIN_NUM_SERVERS);
176    LOG.debug("Done initializing/checking cluster");
177  }
178
179  protected void deleteTable() throws Exception {
180    if (util.getAdmin().tableExists(TABLE_NAME)) {
181      LOG.info("Deleting table");
182      if (!util.getAdmin().isTableDisabled(TABLE_NAME)) {
183        util.getAdmin().disableTable(TABLE_NAME);
184      }
185      util.getAdmin().deleteTable(TABLE_NAME);
186      LOG.info("Deleted table");
187    }
188  }
189
190  private void createTable(boolean isStripe) throws Exception {
191    createTable(createHtd(isStripe));
192  }
193
194  private void tearDown() throws Exception {
195    deleteTable();
196    LOG.info("Restoring the cluster");
197    util.restoreCluster();
198    LOG.info("Done restoring the cluster");
199  }
200
201  private void runOneTest(String description, Configuration conf) throws Exception {
202    int numServers =
203      util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size();
204    long startKey = preloadKeys * numServers;
205    long endKey = startKey + writeKeys * numServers;
206    status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
207      description, numServers, startKey, endKey));
208
209    if (preloadKeys > 0) {
210      MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
211      long time = EnvironmentEdgeManager.currentTime();
212      preloader.start(0, startKey, writeThreads);
213      preloader.waitForFinish();
214      if (preloader.getNumWriteFailures() > 0) {
215        throw new IOException("Preload failed");
216      }
217      int waitTime = (int) Math.min(preloadKeys / 100, 30000); // arbitrary
218      status(description + " preload took " + (EnvironmentEdgeManager.currentTime() - time) / 1000
219        + "sec; sleeping for " + waitTime / 1000 + "sec for store to stabilize");
220      Thread.sleep(waitTime);
221    }
222
223    MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
224    MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100);
225    // reader.getMetrics().enable();
226    reader.linkToWriter(writer);
227
228    long testStartTime = EnvironmentEdgeManager.currentTime();
229    writer.start(startKey, endKey, writeThreads);
230    reader.start(startKey, endKey, readThreads);
231    writer.waitForFinish();
232    reader.waitForFinish();
233    // reader.waitForVerification(300000);
234    // reader.abortAndWaitForFinish();
235    status("Readers and writers stopped for test " + description);
236
237    boolean success = writer.getNumWriteFailures() == 0;
238    if (!success) {
239      LOG.error("Write failed");
240    } else {
241      success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
242      if (!success) {
243        LOG.error("Read failed");
244      }
245    }
246
247    // Dump perf regardless of the result.
248    /*
249     * StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt :
250     * reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n",
251     * description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long,
252     * Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while
253     * (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next();
254     * perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(),
255     * pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description +
256     * " test: \n" + perfDump.toString());
257     */
258    status(description + " test took "
259      + (EnvironmentEdgeManager.currentTime() - testStartTime) / 1000 + "sec");
260    Assert.assertTrue(success);
261  }
262
263  private static void status(String s) {
264    LOG.info("STATUS " + s);
265    System.out.println(s);
266  }
267
268  private HTableDescriptor createHtd(boolean isStripe) throws Exception {
269    HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
270    htd.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
271    String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName();
272    htd.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy);
273    if (isStripe) {
274      htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
275      if (initialStripeCount != null) {
276        htd.setConfiguration(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY,
277          initialStripeCount.toString());
278        htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY,
279          Long.toString(10 * initialStripeCount));
280      } else {
281        htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "500");
282      }
283      if (splitSize != null) {
284        htd.setConfiguration(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString());
285      }
286      if (splitParts != null) {
287        htd.setConfiguration(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString());
288      }
289    } else {
290      htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "10"); // default
291    }
292    return htd;
293  }
294
295  protected void createTable(HTableDescriptor htd) throws Exception {
296    deleteTable();
297    if (util.getHBaseClusterInterface() instanceof MiniHBaseCluster) {
298      LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low.");
299      htd.setConfiguration(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576");
300    }
301    byte[][] splits = new RegionSplitter.HexStringSplit()
302      .split(util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
303    util.getAdmin().createTable(htd, splits);
304  }
305
306  public static class SeqShardedDataGenerator extends LoadTestDataGenerator {
307    private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") };
308    private static final int PAD_TO = 10;
309    private static final int PREFIX_PAD_TO = 7;
310
311    private final int numPartitions;
312
313    public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) {
314      super(minValueSize, maxValueSize);
315      this.numPartitions = numPartitions;
316    }
317
318    @Override
319    public byte[] getDeterministicUniqueKey(long keyBase) {
320      String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0");
321      return Bytes.toBytes(getPrefix(keyBase) + num);
322    }
323
324    private String getPrefix(long i) {
325      return StringUtils.leftPad(String.valueOf((int) (i % numPartitions)), PREFIX_PAD_TO, "0");
326    }
327
328    @Override
329    public byte[][] getColumnFamilies() {
330      return new byte[][] { COLUMN_FAMILY };
331    }
332
333    @Override
334    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
335      return COLUMN_NAMES;
336    }
337
338    @Override
339    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
340      return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
341    }
342
343    @Override
344    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
345      return LoadTestKVGenerator.verify(value, rowKey, cf, column);
346    }
347
348    @Override
349    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
350      return true;
351    }
352  };
353}