001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.Locale; 022import java.util.Set; 023import org.apache.commons.lang3.StringUtils; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 026import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 027import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 028import org.apache.hadoop.hbase.regionserver.HStore; 029import org.apache.hadoop.hbase.regionserver.StoreEngine; 030import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 031import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 032import org.apache.hadoop.hbase.util.AbstractHBaseTool; 033import org.apache.hadoop.hbase.util.Bytes; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.hadoop.hbase.util.LoadTestKVGenerator; 036import org.apache.hadoop.hbase.util.MultiThreadedAction; 037import org.apache.hadoop.hbase.util.MultiThreadedReader; 038import org.apache.hadoop.hbase.util.MultiThreadedWriter; 039import org.apache.hadoop.hbase.util.RegionSplitter; 040import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 041import org.apache.yetus.audience.InterfaceAudience; 042import org.junit.Assert; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 047 048/** 049 * A perf test which does large data ingestion using stripe compactions and regular compactions. 050 */ 051@InterfaceAudience.Private 052public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool { 053 private static final Logger LOG = 054 LoggerFactory.getLogger(StripeCompactionsPerformanceEvaluation.class); 055 056 private static final TableName TABLE_NAME = 057 TableName.valueOf(StripeCompactionsPerformanceEvaluation.class.getSimpleName()); 058 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF"); 059 private static final int MIN_NUM_SERVERS = 1; 060 061 // Option names. 062 private static final String DATAGEN_KEY = "datagen"; 063 private static final String ITERATIONS_KEY = "iters"; 064 private static final String PRELOAD_COUNT_KEY = "pwk"; 065 private static final String WRITE_COUNT_KEY = "wk"; 066 private static final String WRITE_THREADS_KEY = "wt"; 067 private static final String READ_THREADS_KEY = "rt"; 068 private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes"; 069 private static final String SPLIT_SIZE_KEY = "splitsize"; 070 private static final String SPLIT_PARTS_KEY = "splitparts"; 071 private static final String VALUE_SIZE_KEY = "valsize"; 072 private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards"; 073 074 // Option values. 075 private LoadTestDataGenerator dataGen; 076 private int iterationCount; 077 private long preloadKeys; 078 private long writeKeys; 079 private int writeThreads; 080 private int readThreads; 081 private Long initialStripeCount; 082 private Long splitSize; 083 private Long splitParts; 084 085 private static final String VALUE_SIZE_DEFAULT = "512:4096"; 086 087 protected IntegrationTestingUtility util = new IntegrationTestingUtility(); 088 089 @Override 090 protected void addOptions() { 091 addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)"); 092 addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many" 093 + " sequences. The number of such shards per server is specified (default is 1)."); 094 addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare"); 095 addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server"); 096 addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server"); 097 addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing"); 098 addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading"); 099 addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially"); 100 addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes"); 101 addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits"); 102 addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;" 103 + " default " + VALUE_SIZE_DEFAULT); 104 } 105 106 @Override 107 protected void processOptions(CommandLine cmd) { 108 int minValueSize = 0, maxValueSize = 0; 109 String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT); 110 if (valueSize.contains(":")) { 111 String[] valueSizes = valueSize.split(":"); 112 if (valueSize.length() != 2) throw new RuntimeException("Invalid value size: " + valueSize); 113 minValueSize = Integer.parseInt(valueSizes[0]); 114 maxValueSize = Integer.parseInt(valueSizes[1]); 115 } else { 116 minValueSize = maxValueSize = Integer.parseInt(valueSize); 117 } 118 String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase(Locale.ROOT); 119 if ("default".equals(datagen)) { 120 dataGen = new MultiThreadedAction.DefaultDataGenerator(minValueSize, maxValueSize, 1, 1, 121 new byte[][] { COLUMN_FAMILY }); 122 } else if ("sequential".equals(datagen)) { 123 int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1")); 124 dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards); 125 } else { 126 throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen); 127 } 128 iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1")); 129 preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000")); 130 writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000")); 131 writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10")); 132 readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20")); 133 initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY); 134 splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY); 135 splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY); 136 } 137 138 private Long getLongOrNull(CommandLine cmd, String option) { 139 if (!cmd.hasOption(option)) return null; 140 return Long.parseLong(cmd.getOptionValue(option)); 141 } 142 143 @Override 144 public Configuration getConf() { 145 Configuration c = super.getConf(); 146 if (c == null && util != null) { 147 conf = util.getConfiguration(); 148 c = conf; 149 } 150 return c; 151 } 152 153 @Override 154 protected int doWork() throws Exception { 155 setUp(); 156 try { 157 boolean isStripe = true; 158 for (int i = 0; i < iterationCount * 2; ++i) { 159 createTable(isStripe); 160 runOneTest((isStripe ? "Stripe" : "Default") + i, conf); 161 isStripe = !isStripe; 162 } 163 return 0; 164 } finally { 165 tearDown(); 166 } 167 } 168 169 private void setUp() throws Exception { 170 this.util = new IntegrationTestingUtility(); 171 LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers"); 172 util.initializeCluster(MIN_NUM_SERVERS); 173 LOG.debug("Done initializing/checking cluster"); 174 } 175 176 protected void deleteTable() throws Exception { 177 if (util.getAdmin().tableExists(TABLE_NAME)) { 178 LOG.info("Deleting table"); 179 if (!util.getAdmin().isTableDisabled(TABLE_NAME)) { 180 util.getAdmin().disableTable(TABLE_NAME); 181 } 182 util.getAdmin().deleteTable(TABLE_NAME); 183 LOG.info("Deleted table"); 184 } 185 } 186 187 private void createTable(boolean isStripe) throws Exception { 188 createTable(createHtd(isStripe)); 189 } 190 191 private void tearDown() throws Exception { 192 deleteTable(); 193 LOG.info("Restoring the cluster"); 194 util.restoreCluster(); 195 LOG.info("Done restoring the cluster"); 196 } 197 198 private void runOneTest(String description, Configuration conf) throws Exception { 199 int numServers = 200 util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size(); 201 long startKey = preloadKeys * numServers; 202 long endKey = startKey + writeKeys * numServers; 203 status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d", 204 description, numServers, startKey, endKey)); 205 206 if (preloadKeys > 0) { 207 MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); 208 long time = EnvironmentEdgeManager.currentTime(); 209 preloader.start(0, startKey, writeThreads); 210 preloader.waitForFinish(); 211 if (preloader.getNumWriteFailures() > 0) { 212 throw new IOException("Preload failed"); 213 } 214 int waitTime = (int) Math.min(preloadKeys / 100, 30000); // arbitrary 215 status(description + " preload took " + (EnvironmentEdgeManager.currentTime() - time) / 1000 216 + "sec; sleeping for " + waitTime / 1000 + "sec for store to stabilize"); 217 Thread.sleep(waitTime); 218 } 219 220 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); 221 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100); 222 // reader.getMetrics().enable(); 223 reader.linkToWriter(writer); 224 225 long testStartTime = EnvironmentEdgeManager.currentTime(); 226 writer.start(startKey, endKey, writeThreads); 227 reader.start(startKey, endKey, readThreads); 228 writer.waitForFinish(); 229 reader.waitForFinish(); 230 // reader.waitForVerification(300000); 231 // reader.abortAndWaitForFinish(); 232 status("Readers and writers stopped for test " + description); 233 234 boolean success = writer.getNumWriteFailures() == 0; 235 if (!success) { 236 LOG.error("Write failed"); 237 } else { 238 success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0; 239 if (!success) { 240 LOG.error("Read failed"); 241 } 242 } 243 244 // Dump perf regardless of the result. 245 /* 246 * StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt : 247 * reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n", 248 * description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long, 249 * Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while 250 * (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next(); 251 * perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(), 252 * pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description + 253 * " test: \n" + perfDump.toString()); 254 */ 255 status(description + " test took " 256 + (EnvironmentEdgeManager.currentTime() - testStartTime) / 1000 + "sec"); 257 Assert.assertTrue(success); 258 } 259 260 private static void status(String s) { 261 LOG.info("STATUS " + s); 262 System.out.println(s); 263 } 264 265 private TableDescriptorBuilder createHtd(boolean isStripe) throws Exception { 266 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME) 267 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY)); 268 String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName(); 269 builder.setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy); 270 if (isStripe) { 271 builder.setValue(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); 272 if (initialStripeCount != null) { 273 builder.setValue(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString()); 274 builder.setValue(HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount)); 275 } else { 276 builder.setValue(HStore.BLOCKING_STOREFILES_KEY, "500"); 277 } 278 if (splitSize != null) { 279 builder.setValue(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString()); 280 } 281 if (splitParts != null) { 282 builder.setValue(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString()); 283 } 284 } else { 285 builder.setValue(HStore.BLOCKING_STOREFILES_KEY, "10"); // default 286 } 287 return builder; 288 } 289 290 private void createTable(TableDescriptorBuilder builder) throws Exception { 291 deleteTable(); 292 if (util.getHBaseClusterInterface() instanceof SingleProcessHBaseCluster) { 293 LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low."); 294 builder.setValue(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576"); 295 } 296 byte[][] splits = new RegionSplitter.HexStringSplit() 297 .split(util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size()); 298 util.getAdmin().createTable(builder.build(), splits); 299 } 300 301 public static class SeqShardedDataGenerator extends LoadTestDataGenerator { 302 private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") }; 303 private static final int PAD_TO = 10; 304 private static final int PREFIX_PAD_TO = 7; 305 306 private final int numPartitions; 307 308 public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) { 309 super(minValueSize, maxValueSize); 310 this.numPartitions = numPartitions; 311 } 312 313 @Override 314 public byte[] getDeterministicUniqueKey(long keyBase) { 315 String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0"); 316 return Bytes.toBytes(getPrefix(keyBase) + num); 317 } 318 319 private String getPrefix(long i) { 320 return StringUtils.leftPad(String.valueOf((int) (i % numPartitions)), PREFIX_PAD_TO, "0"); 321 } 322 323 @Override 324 public byte[][] getColumnFamilies() { 325 return new byte[][] { COLUMN_FAMILY }; 326 } 327 328 @Override 329 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { 330 return COLUMN_NAMES; 331 } 332 333 @Override 334 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { 335 return kvGenerator.generateRandomSizeValue(rowKey, cf, column); 336 } 337 338 @Override 339 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { 340 return LoadTestKVGenerator.verify(value, rowKey, cf, column); 341 } 342 343 @Override 344 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) { 345 return true; 346 } 347 } 348}