001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.Locale;
022import java.util.Set;
023import org.apache.commons.lang3.StringUtils;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
026import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
027import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
028import org.apache.hadoop.hbase.regionserver.HStore;
029import org.apache.hadoop.hbase.regionserver.StoreEngine;
030import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
031import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
032import org.apache.hadoop.hbase.util.AbstractHBaseTool;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
036import org.apache.hadoop.hbase.util.MultiThreadedAction;
037import org.apache.hadoop.hbase.util.MultiThreadedReader;
038import org.apache.hadoop.hbase.util.MultiThreadedWriter;
039import org.apache.hadoop.hbase.util.RegionSplitter;
040import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.junit.Assert;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
047
048/**
049 * A perf test which does large data ingestion using stripe compactions and regular compactions.
050 */
051@InterfaceAudience.Private
052public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool {
053  private static final Logger LOG =
054    LoggerFactory.getLogger(StripeCompactionsPerformanceEvaluation.class);
055
056  private static final TableName TABLE_NAME =
057    TableName.valueOf(StripeCompactionsPerformanceEvaluation.class.getSimpleName());
058  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF");
059  private static final int MIN_NUM_SERVERS = 1;
060
061  // Option names.
062  private static final String DATAGEN_KEY = "datagen";
063  private static final String ITERATIONS_KEY = "iters";
064  private static final String PRELOAD_COUNT_KEY = "pwk";
065  private static final String WRITE_COUNT_KEY = "wk";
066  private static final String WRITE_THREADS_KEY = "wt";
067  private static final String READ_THREADS_KEY = "rt";
068  private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes";
069  private static final String SPLIT_SIZE_KEY = "splitsize";
070  private static final String SPLIT_PARTS_KEY = "splitparts";
071  private static final String VALUE_SIZE_KEY = "valsize";
072  private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards";
073
074  // Option values.
075  private LoadTestDataGenerator dataGen;
076  private int iterationCount;
077  private long preloadKeys;
078  private long writeKeys;
079  private int writeThreads;
080  private int readThreads;
081  private Long initialStripeCount;
082  private Long splitSize;
083  private Long splitParts;
084
085  private static final String VALUE_SIZE_DEFAULT = "512:4096";
086
087  protected IntegrationTestingUtility util = new IntegrationTestingUtility();
088
089  @Override
090  protected void addOptions() {
091    addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)");
092    addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many"
093      + " sequences. The number of such shards per server is specified (default is 1).");
094    addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare");
095    addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server");
096    addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server");
097    addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing");
098    addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading");
099    addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially");
100    addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes");
101    addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits");
102    addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;"
103      + " default " + VALUE_SIZE_DEFAULT);
104  }
105
106  @Override
107  protected void processOptions(CommandLine cmd) {
108    int minValueSize = 0, maxValueSize = 0;
109    String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT);
110    if (valueSize.contains(":")) {
111      String[] valueSizes = valueSize.split(":");
112      if (valueSize.length() != 2) throw new RuntimeException("Invalid value size: " + valueSize);
113      minValueSize = Integer.parseInt(valueSizes[0]);
114      maxValueSize = Integer.parseInt(valueSizes[1]);
115    } else {
116      minValueSize = maxValueSize = Integer.parseInt(valueSize);
117    }
118    String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase(Locale.ROOT);
119    if ("default".equals(datagen)) {
120      dataGen = new MultiThreadedAction.DefaultDataGenerator(minValueSize, maxValueSize, 1, 1,
121        new byte[][] { COLUMN_FAMILY });
122    } else if ("sequential".equals(datagen)) {
123      int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1"));
124      dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards);
125    } else {
126      throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen);
127    }
128    iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1"));
129    preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000"));
130    writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000"));
131    writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10"));
132    readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20"));
133    initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY);
134    splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY);
135    splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY);
136  }
137
138  private Long getLongOrNull(CommandLine cmd, String option) {
139    if (!cmd.hasOption(option)) return null;
140    return Long.parseLong(cmd.getOptionValue(option));
141  }
142
143  @Override
144  public Configuration getConf() {
145    Configuration c = super.getConf();
146    if (c == null && util != null) {
147      conf = util.getConfiguration();
148      c = conf;
149    }
150    return c;
151  }
152
153  @Override
154  protected int doWork() throws Exception {
155    setUp();
156    try {
157      boolean isStripe = true;
158      for (int i = 0; i < iterationCount * 2; ++i) {
159        createTable(isStripe);
160        runOneTest((isStripe ? "Stripe" : "Default") + i, conf);
161        isStripe = !isStripe;
162      }
163      return 0;
164    } finally {
165      tearDown();
166    }
167  }
168
169  private void setUp() throws Exception {
170    this.util = new IntegrationTestingUtility();
171    LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers");
172    util.initializeCluster(MIN_NUM_SERVERS);
173    LOG.debug("Done initializing/checking cluster");
174  }
175
176  protected void deleteTable() throws Exception {
177    if (util.getAdmin().tableExists(TABLE_NAME)) {
178      LOG.info("Deleting table");
179      if (!util.getAdmin().isTableDisabled(TABLE_NAME)) {
180        util.getAdmin().disableTable(TABLE_NAME);
181      }
182      util.getAdmin().deleteTable(TABLE_NAME);
183      LOG.info("Deleted table");
184    }
185  }
186
187  private void createTable(boolean isStripe) throws Exception {
188    createTable(createHtd(isStripe));
189  }
190
191  private void tearDown() throws Exception {
192    deleteTable();
193    LOG.info("Restoring the cluster");
194    util.restoreCluster();
195    LOG.info("Done restoring the cluster");
196  }
197
198  private void runOneTest(String description, Configuration conf) throws Exception {
199    int numServers =
200      util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size();
201    long startKey = preloadKeys * numServers;
202    long endKey = startKey + writeKeys * numServers;
203    status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
204      description, numServers, startKey, endKey));
205
206    if (preloadKeys > 0) {
207      MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
208      long time = EnvironmentEdgeManager.currentTime();
209      preloader.start(0, startKey, writeThreads);
210      preloader.waitForFinish();
211      if (preloader.getNumWriteFailures() > 0) {
212        throw new IOException("Preload failed");
213      }
214      int waitTime = (int) Math.min(preloadKeys / 100, 30000); // arbitrary
215      status(description + " preload took " + (EnvironmentEdgeManager.currentTime() - time) / 1000
216        + "sec; sleeping for " + waitTime / 1000 + "sec for store to stabilize");
217      Thread.sleep(waitTime);
218    }
219
220    MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
221    MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100);
222    // reader.getMetrics().enable();
223    reader.linkToWriter(writer);
224
225    long testStartTime = EnvironmentEdgeManager.currentTime();
226    writer.start(startKey, endKey, writeThreads);
227    reader.start(startKey, endKey, readThreads);
228    writer.waitForFinish();
229    reader.waitForFinish();
230    // reader.waitForVerification(300000);
231    // reader.abortAndWaitForFinish();
232    status("Readers and writers stopped for test " + description);
233
234    boolean success = writer.getNumWriteFailures() == 0;
235    if (!success) {
236      LOG.error("Write failed");
237    } else {
238      success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
239      if (!success) {
240        LOG.error("Read failed");
241      }
242    }
243
244    // Dump perf regardless of the result.
245    /*
246     * StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt :
247     * reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n",
248     * description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long,
249     * Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while
250     * (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next();
251     * perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(),
252     * pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description +
253     * " test: \n" + perfDump.toString());
254     */
255    status(description + " test took "
256      + (EnvironmentEdgeManager.currentTime() - testStartTime) / 1000 + "sec");
257    Assert.assertTrue(success);
258  }
259
260  private static void status(String s) {
261    LOG.info("STATUS " + s);
262    System.out.println(s);
263  }
264
265  private TableDescriptorBuilder createHtd(boolean isStripe) throws Exception {
266    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME)
267      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY));
268    String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName();
269    builder.setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy);
270    if (isStripe) {
271      builder.setValue(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
272      if (initialStripeCount != null) {
273        builder.setValue(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString());
274        builder.setValue(HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount));
275      } else {
276        builder.setValue(HStore.BLOCKING_STOREFILES_KEY, "500");
277      }
278      if (splitSize != null) {
279        builder.setValue(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString());
280      }
281      if (splitParts != null) {
282        builder.setValue(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString());
283      }
284    } else {
285      builder.setValue(HStore.BLOCKING_STOREFILES_KEY, "10"); // default
286    }
287    return builder;
288  }
289
290  private void createTable(TableDescriptorBuilder builder) throws Exception {
291    deleteTable();
292    if (util.getHBaseClusterInterface() instanceof SingleProcessHBaseCluster) {
293      LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low.");
294      builder.setValue(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576");
295    }
296    byte[][] splits = new RegionSplitter.HexStringSplit()
297      .split(util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
298    util.getAdmin().createTable(builder.build(), splits);
299  }
300
301  public static class SeqShardedDataGenerator extends LoadTestDataGenerator {
302    private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") };
303    private static final int PAD_TO = 10;
304    private static final int PREFIX_PAD_TO = 7;
305
306    private final int numPartitions;
307
308    public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) {
309      super(minValueSize, maxValueSize);
310      this.numPartitions = numPartitions;
311    }
312
313    @Override
314    public byte[] getDeterministicUniqueKey(long keyBase) {
315      String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0");
316      return Bytes.toBytes(getPrefix(keyBase) + num);
317    }
318
319    private String getPrefix(long i) {
320      return StringUtils.leftPad(String.valueOf((int) (i % numPartitions)), PREFIX_PAD_TO, "0");
321    }
322
323    @Override
324    public byte[][] getColumnFamilies() {
325      return new byte[][] { COLUMN_FAMILY };
326    }
327
328    @Override
329    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
330      return COLUMN_NAMES;
331    }
332
333    @Override
334    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
335      return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
336    }
337
338    @Override
339    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
340      return LoadTestKVGenerator.verify(value, rowKey, cf, column);
341    }
342
343    @Override
344    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
345      return true;
346    }
347  }
348}