001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Locale; 025import java.util.Set; 026import org.apache.commons.lang3.StringUtils; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 029import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 030import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 031import org.apache.hadoop.hbase.regionserver.HStore; 032import org.apache.hadoop.hbase.regionserver.StoreEngine; 033import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 034import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 035import org.apache.hadoop.hbase.util.AbstractHBaseTool; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.LoadTestKVGenerator; 039import org.apache.hadoop.hbase.util.MultiThreadedAction; 040import org.apache.hadoop.hbase.util.MultiThreadedReader; 041import org.apache.hadoop.hbase.util.MultiThreadedWriter; 042import org.apache.hadoop.hbase.util.RegionSplitter; 043import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 049import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 050import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 051 052/** 053 * A perf test which does large data ingestion using stripe compactions and regular compactions. 054 */ 055@InterfaceAudience.Private 056public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool { 057 private static final Logger LOG = 058 LoggerFactory.getLogger(StripeCompactionsPerformanceEvaluation.class); 059 060 private static final TableName TABLE_NAME = 061 TableName.valueOf(StripeCompactionsPerformanceEvaluation.class.getSimpleName()); 062 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF"); 063 private static final int MIN_NUM_SERVERS = 1; 064 065 // Option names. 066 private static final String DATAGEN_KEY = "datagen"; 067 private static final String ITERATIONS_KEY = "iters"; 068 private static final String PRELOAD_COUNT_KEY = "pwk"; 069 private static final String WRITE_COUNT_KEY = "wk"; 070 private static final String WRITE_THREADS_KEY = "wt"; 071 private static final String READ_THREADS_KEY = "rt"; 072 private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes"; 073 private static final String SPLIT_SIZE_KEY = "splitsize"; 074 private static final String SPLIT_PARTS_KEY = "splitparts"; 075 private static final String VALUE_SIZE_KEY = "valsize"; 076 private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards"; 077 078 // Option values. 079 private LoadTestDataGenerator dataGen; 080 private int iterationCount; 081 private long preloadKeys; 082 private long writeKeys; 083 private int writeThreads; 084 private int readThreads; 085 private Long initialStripeCount; 086 private Long splitSize; 087 private Long splitParts; 088 089 private static final String VALUE_SIZE_DEFAULT = "512:4096"; 090 091 protected IntegrationTestingUtility util = new IntegrationTestingUtility(); 092 093 @Override 094 protected void addOptions() { 095 addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)"); 096 addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many" 097 + " sequences. The number of such shards per server is specified (default is 1)."); 098 addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare"); 099 addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server"); 100 addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server"); 101 addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing"); 102 addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading"); 103 addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially"); 104 addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes"); 105 addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits"); 106 addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;" 107 + " default " + VALUE_SIZE_DEFAULT); 108 } 109 110 @Override 111 protected void processOptions(CommandLine cmd) { 112 int minValueSize = 0, maxValueSize = 0; 113 String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT); 114 if (valueSize.contains(":")) { 115 List<String> valueSizes = Splitter.on(':').splitToList(valueSize); 116 if (valueSizes.size() != 2) { 117 throw new RuntimeException("Invalid value size: " + valueSize); 118 } 119 minValueSize = Integer.parseInt(Iterables.get(valueSizes, 0)); 120 maxValueSize = Integer.parseInt(Iterables.get(valueSizes, 1)); 121 } else { 122 minValueSize = maxValueSize = Integer.parseInt(valueSize); 123 } 124 String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase(Locale.ROOT); 125 if ("default".equals(datagen)) { 126 dataGen = new MultiThreadedAction.DefaultDataGenerator(minValueSize, maxValueSize, 1, 1, 127 new byte[][] { COLUMN_FAMILY }); 128 } else if ("sequential".equals(datagen)) { 129 int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1")); 130 dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards); 131 } else { 132 throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen); 133 } 134 iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1")); 135 preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000")); 136 writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000")); 137 writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10")); 138 readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20")); 139 initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY); 140 splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY); 141 splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY); 142 } 143 144 private Long getLongOrNull(CommandLine cmd, String option) { 145 if (!cmd.hasOption(option)) return null; 146 return Long.parseLong(cmd.getOptionValue(option)); 147 } 148 149 @Override 150 public Configuration getConf() { 151 Configuration c = super.getConf(); 152 if (c == null && util != null) { 153 conf = util.getConfiguration(); 154 c = conf; 155 } 156 return c; 157 } 158 159 @Override 160 protected int doWork() throws Exception { 161 setUp(); 162 try { 163 boolean isStripe = true; 164 for (int i = 0; i < iterationCount * 2; ++i) { 165 createTable(isStripe); 166 runOneTest((isStripe ? "Stripe" : "Default") + i, conf); 167 isStripe = !isStripe; 168 } 169 return 0; 170 } finally { 171 tearDown(); 172 } 173 } 174 175 private void setUp() throws Exception { 176 this.util = new IntegrationTestingUtility(); 177 LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers"); 178 util.initializeCluster(MIN_NUM_SERVERS); 179 LOG.debug("Done initializing/checking cluster"); 180 } 181 182 protected void deleteTable() throws Exception { 183 if (util.getAdmin().tableExists(TABLE_NAME)) { 184 LOG.info("Deleting table"); 185 if (!util.getAdmin().isTableDisabled(TABLE_NAME)) { 186 util.getAdmin().disableTable(TABLE_NAME); 187 } 188 util.getAdmin().deleteTable(TABLE_NAME); 189 LOG.info("Deleted table"); 190 } 191 } 192 193 private void createTable(boolean isStripe) throws Exception { 194 createTable(createHtd(isStripe)); 195 } 196 197 private void tearDown() throws Exception { 198 deleteTable(); 199 LOG.info("Restoring the cluster"); 200 util.restoreCluster(); 201 LOG.info("Done restoring the cluster"); 202 } 203 204 private void runOneTest(String description, Configuration conf) throws Exception { 205 int numServers = 206 util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size(); 207 long startKey = preloadKeys * numServers; 208 long endKey = startKey + writeKeys * numServers; 209 status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d", 210 description, numServers, startKey, endKey)); 211 212 if (preloadKeys > 0) { 213 MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); 214 long time = EnvironmentEdgeManager.currentTime(); 215 preloader.start(0, startKey, writeThreads); 216 preloader.waitForFinish(); 217 if (preloader.getNumWriteFailures() > 0) { 218 throw new IOException("Preload failed"); 219 } 220 int waitTime = (int) Math.min(preloadKeys / 100, 30000); // arbitrary 221 status(description + " preload took " + (EnvironmentEdgeManager.currentTime() - time) / 1000 222 + "sec; sleeping for " + waitTime / 1000 + "sec for store to stabilize"); 223 Thread.sleep(waitTime); 224 } 225 226 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); 227 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100); 228 // reader.getMetrics().enable(); 229 reader.linkToWriter(writer); 230 231 long testStartTime = EnvironmentEdgeManager.currentTime(); 232 writer.start(startKey, endKey, writeThreads); 233 reader.start(startKey, endKey, readThreads); 234 writer.waitForFinish(); 235 reader.waitForFinish(); 236 // reader.waitForVerification(300000); 237 // reader.abortAndWaitForFinish(); 238 status("Readers and writers stopped for test " + description); 239 240 boolean success = writer.getNumWriteFailures() == 0; 241 if (!success) { 242 LOG.error("Write failed"); 243 } else { 244 success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0; 245 if (!success) { 246 LOG.error("Read failed"); 247 } 248 } 249 250 // Dump perf regardless of the result. 251 /* 252 * StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt : 253 * reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n", 254 * description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long, 255 * Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while 256 * (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next(); 257 * perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(), 258 * pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description + 259 * " test: \n" + perfDump.toString()); 260 */ 261 status(description + " test took " 262 + (EnvironmentEdgeManager.currentTime() - testStartTime) / 1000 + "sec"); 263 assertTrue(success); 264 } 265 266 private static void status(String s) { 267 LOG.info("STATUS " + s); 268 System.out.println(s); 269 } 270 271 private TableDescriptorBuilder createHtd(boolean isStripe) throws Exception { 272 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME) 273 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)); 274 String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName(); 275 builder.setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy); 276 if (isStripe) { 277 builder.setValue(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); 278 if (initialStripeCount != null) { 279 builder.setValue(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString()); 280 builder.setValue(HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount)); 281 } else { 282 builder.setValue(HStore.BLOCKING_STOREFILES_KEY, "500"); 283 } 284 if (splitSize != null) { 285 builder.setValue(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString()); 286 } 287 if (splitParts != null) { 288 builder.setValue(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString()); 289 } 290 } else { 291 builder.setValue(HStore.BLOCKING_STOREFILES_KEY, "10"); // default 292 } 293 return builder; 294 } 295 296 private void createTable(TableDescriptorBuilder builder) throws Exception { 297 deleteTable(); 298 if (util.getHBaseClusterInterface() instanceof SingleProcessHBaseCluster) { 299 LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low."); 300 builder.setValue(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576"); 301 } 302 byte[][] splits = new RegionSplitter.HexStringSplit() 303 .split(util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size()); 304 util.getAdmin().createTable(builder.build(), splits); 305 } 306 307 public static class SeqShardedDataGenerator extends LoadTestDataGenerator { 308 private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") }; 309 private static final int PAD_TO = 10; 310 private static final int PREFIX_PAD_TO = 7; 311 312 private final int numPartitions; 313 314 public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) { 315 super(minValueSize, maxValueSize); 316 this.numPartitions = numPartitions; 317 } 318 319 @Override 320 public byte[] getDeterministicUniqueKey(long keyBase) { 321 String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0"); 322 return Bytes.toBytes(getPrefix(keyBase) + num); 323 } 324 325 private String getPrefix(long i) { 326 return StringUtils.leftPad(String.valueOf((int) (i % numPartitions)), PREFIX_PAD_TO, "0"); 327 } 328 329 @Override 330 public byte[][] getColumnFamilies() { 331 return new byte[][] { COLUMN_FAMILY }; 332 } 333 334 @Override 335 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { 336 return COLUMN_NAMES; 337 } 338 339 @Override 340 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { 341 return kvGenerator.generateRandomSizeValue(rowKey, cf, column); 342 } 343 344 @Override 345 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { 346 return LoadTestKVGenerator.verify(value, rowKey, cf, column); 347 } 348 349 @Override 350 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) { 351 return true; 352 } 353 } 354}