001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Locale;
025import java.util.Set;
026import org.apache.commons.lang3.StringUtils;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
029import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
030import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
031import org.apache.hadoop.hbase.regionserver.HStore;
032import org.apache.hadoop.hbase.regionserver.StoreEngine;
033import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
034import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
035import org.apache.hadoop.hbase.util.AbstractHBaseTool;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
039import org.apache.hadoop.hbase.util.MultiThreadedAction;
040import org.apache.hadoop.hbase.util.MultiThreadedReader;
041import org.apache.hadoop.hbase.util.MultiThreadedWriter;
042import org.apache.hadoop.hbase.util.RegionSplitter;
043import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
049import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
050import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
051
052/**
053 * A perf test which does large data ingestion using stripe compactions and regular compactions.
054 */
055@InterfaceAudience.Private
056public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool {
057  private static final Logger LOG =
058    LoggerFactory.getLogger(StripeCompactionsPerformanceEvaluation.class);
059
060  private static final TableName TABLE_NAME =
061    TableName.valueOf(StripeCompactionsPerformanceEvaluation.class.getSimpleName());
062  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF");
063  private static final int MIN_NUM_SERVERS = 1;
064
065  // Option names.
066  private static final String DATAGEN_KEY = "datagen";
067  private static final String ITERATIONS_KEY = "iters";
068  private static final String PRELOAD_COUNT_KEY = "pwk";
069  private static final String WRITE_COUNT_KEY = "wk";
070  private static final String WRITE_THREADS_KEY = "wt";
071  private static final String READ_THREADS_KEY = "rt";
072  private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes";
073  private static final String SPLIT_SIZE_KEY = "splitsize";
074  private static final String SPLIT_PARTS_KEY = "splitparts";
075  private static final String VALUE_SIZE_KEY = "valsize";
076  private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards";
077
078  // Option values.
079  private LoadTestDataGenerator dataGen;
080  private int iterationCount;
081  private long preloadKeys;
082  private long writeKeys;
083  private int writeThreads;
084  private int readThreads;
085  private Long initialStripeCount;
086  private Long splitSize;
087  private Long splitParts;
088
089  private static final String VALUE_SIZE_DEFAULT = "512:4096";
090
091  protected IntegrationTestingUtility util = new IntegrationTestingUtility();
092
093  @Override
094  protected void addOptions() {
095    addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)");
096    addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many"
097      + " sequences. The number of such shards per server is specified (default is 1).");
098    addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare");
099    addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server");
100    addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server");
101    addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing");
102    addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading");
103    addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially");
104    addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes");
105    addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits");
106    addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;"
107      + " default " + VALUE_SIZE_DEFAULT);
108  }
109
110  @Override
111  protected void processOptions(CommandLine cmd) {
112    int minValueSize = 0, maxValueSize = 0;
113    String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT);
114    if (valueSize.contains(":")) {
115      List<String> valueSizes = Splitter.on(':').splitToList(valueSize);
116      if (valueSizes.size() != 2) {
117        throw new RuntimeException("Invalid value size: " + valueSize);
118      }
119      minValueSize = Integer.parseInt(Iterables.get(valueSizes, 0));
120      maxValueSize = Integer.parseInt(Iterables.get(valueSizes, 1));
121    } else {
122      minValueSize = maxValueSize = Integer.parseInt(valueSize);
123    }
124    String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase(Locale.ROOT);
125    if ("default".equals(datagen)) {
126      dataGen = new MultiThreadedAction.DefaultDataGenerator(minValueSize, maxValueSize, 1, 1,
127        new byte[][] { COLUMN_FAMILY });
128    } else if ("sequential".equals(datagen)) {
129      int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1"));
130      dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards);
131    } else {
132      throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen);
133    }
134    iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1"));
135    preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000"));
136    writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000"));
137    writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10"));
138    readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20"));
139    initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY);
140    splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY);
141    splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY);
142  }
143
144  private Long getLongOrNull(CommandLine cmd, String option) {
145    if (!cmd.hasOption(option)) return null;
146    return Long.parseLong(cmd.getOptionValue(option));
147  }
148
149  @Override
150  public Configuration getConf() {
151    Configuration c = super.getConf();
152    if (c == null && util != null) {
153      conf = util.getConfiguration();
154      c = conf;
155    }
156    return c;
157  }
158
159  @Override
160  protected int doWork() throws Exception {
161    setUp();
162    try {
163      boolean isStripe = true;
164      for (int i = 0; i < iterationCount * 2; ++i) {
165        createTable(isStripe);
166        runOneTest((isStripe ? "Stripe" : "Default") + i, conf);
167        isStripe = !isStripe;
168      }
169      return 0;
170    } finally {
171      tearDown();
172    }
173  }
174
175  private void setUp() throws Exception {
176    this.util = new IntegrationTestingUtility();
177    LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers");
178    util.initializeCluster(MIN_NUM_SERVERS);
179    LOG.debug("Done initializing/checking cluster");
180  }
181
182  protected void deleteTable() throws Exception {
183    if (util.getAdmin().tableExists(TABLE_NAME)) {
184      LOG.info("Deleting table");
185      if (!util.getAdmin().isTableDisabled(TABLE_NAME)) {
186        util.getAdmin().disableTable(TABLE_NAME);
187      }
188      util.getAdmin().deleteTable(TABLE_NAME);
189      LOG.info("Deleted table");
190    }
191  }
192
193  private void createTable(boolean isStripe) throws Exception {
194    createTable(createHtd(isStripe));
195  }
196
197  private void tearDown() throws Exception {
198    deleteTable();
199    LOG.info("Restoring the cluster");
200    util.restoreCluster();
201    LOG.info("Done restoring the cluster");
202  }
203
204  private void runOneTest(String description, Configuration conf) throws Exception {
205    int numServers =
206      util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size();
207    long startKey = preloadKeys * numServers;
208    long endKey = startKey + writeKeys * numServers;
209    status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d",
210      description, numServers, startKey, endKey));
211
212    if (preloadKeys > 0) {
213      MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
214      long time = EnvironmentEdgeManager.currentTime();
215      preloader.start(0, startKey, writeThreads);
216      preloader.waitForFinish();
217      if (preloader.getNumWriteFailures() > 0) {
218        throw new IOException("Preload failed");
219      }
220      int waitTime = (int) Math.min(preloadKeys / 100, 30000); // arbitrary
221      status(description + " preload took " + (EnvironmentEdgeManager.currentTime() - time) / 1000
222        + "sec; sleeping for " + waitTime / 1000 + "sec for store to stabilize");
223      Thread.sleep(waitTime);
224    }
225
226    MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
227    MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100);
228    // reader.getMetrics().enable();
229    reader.linkToWriter(writer);
230
231    long testStartTime = EnvironmentEdgeManager.currentTime();
232    writer.start(startKey, endKey, writeThreads);
233    reader.start(startKey, endKey, readThreads);
234    writer.waitForFinish();
235    reader.waitForFinish();
236    // reader.waitForVerification(300000);
237    // reader.abortAndWaitForFinish();
238    status("Readers and writers stopped for test " + description);
239
240    boolean success = writer.getNumWriteFailures() == 0;
241    if (!success) {
242      LOG.error("Write failed");
243    } else {
244      success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0;
245      if (!success) {
246        LOG.error("Read failed");
247      }
248    }
249
250    // Dump perf regardless of the result.
251    /*
252     * StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt :
253     * reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n",
254     * description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long,
255     * Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while
256     * (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next();
257     * perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(),
258     * pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description +
259     * " test: \n" + perfDump.toString());
260     */
261    status(description + " test took "
262      + (EnvironmentEdgeManager.currentTime() - testStartTime) / 1000 + "sec");
263    assertTrue(success);
264  }
265
266  private static void status(String s) {
267    LOG.info("STATUS " + s);
268    System.out.println(s);
269  }
270
271  private TableDescriptorBuilder createHtd(boolean isStripe) throws Exception {
272    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME)
273      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY));
274    String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName();
275    builder.setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy);
276    if (isStripe) {
277      builder.setValue(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
278      if (initialStripeCount != null) {
279        builder.setValue(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString());
280        builder.setValue(HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount));
281      } else {
282        builder.setValue(HStore.BLOCKING_STOREFILES_KEY, "500");
283      }
284      if (splitSize != null) {
285        builder.setValue(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString());
286      }
287      if (splitParts != null) {
288        builder.setValue(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString());
289      }
290    } else {
291      builder.setValue(HStore.BLOCKING_STOREFILES_KEY, "10"); // default
292    }
293    return builder;
294  }
295
296  private void createTable(TableDescriptorBuilder builder) throws Exception {
297    deleteTable();
298    if (util.getHBaseClusterInterface() instanceof SingleProcessHBaseCluster) {
299      LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low.");
300      builder.setValue(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576");
301    }
302    byte[][] splits = new RegionSplitter.HexStringSplit()
303      .split(util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
304    util.getAdmin().createTable(builder.build(), splits);
305  }
306
307  public static class SeqShardedDataGenerator extends LoadTestDataGenerator {
308    private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") };
309    private static final int PAD_TO = 10;
310    private static final int PREFIX_PAD_TO = 7;
311
312    private final int numPartitions;
313
314    public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) {
315      super(minValueSize, maxValueSize);
316      this.numPartitions = numPartitions;
317    }
318
319    @Override
320    public byte[] getDeterministicUniqueKey(long keyBase) {
321      String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0");
322      return Bytes.toBytes(getPrefix(keyBase) + num);
323    }
324
325    private String getPrefix(long i) {
326      return StringUtils.leftPad(String.valueOf((int) (i % numPartitions)), PREFIX_PAD_TO, "0");
327    }
328
329    @Override
330    public byte[][] getColumnFamilies() {
331      return new byte[][] { COLUMN_FAMILY };
332    }
333
334    @Override
335    public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
336      return COLUMN_NAMES;
337    }
338
339    @Override
340    public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
341      return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
342    }
343
344    @Override
345    public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
346      return LoadTestKVGenerator.verify(value, rowKey, cf, column);
347    }
348
349    @Override
350    public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
351      return true;
352    }
353  }
354}