001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.Locale; 023import java.util.Set; 024import org.apache.commons.lang3.StringUtils; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 027import org.apache.hadoop.hbase.regionserver.HStore; 028import org.apache.hadoop.hbase.regionserver.StoreEngine; 029import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 030import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 031import org.apache.hadoop.hbase.util.AbstractHBaseTool; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.hbase.util.LoadTestKVGenerator; 035import org.apache.hadoop.hbase.util.MultiThreadedAction; 036import org.apache.hadoop.hbase.util.MultiThreadedReader; 037import org.apache.hadoop.hbase.util.MultiThreadedWriter; 038import org.apache.hadoop.hbase.util.RegionSplitter; 039import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.junit.Assert; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 047import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 048 049/** 050 * A perf test which does large data ingestion using stripe compactions and regular compactions. 051 */ 052@InterfaceAudience.Private 053public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool { 054 private static final Logger LOG = 055 LoggerFactory.getLogger(StripeCompactionsPerformanceEvaluation.class); 056 057 private static final TableName TABLE_NAME = 058 TableName.valueOf(StripeCompactionsPerformanceEvaluation.class.getSimpleName()); 059 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF"); 060 private static final int MIN_NUM_SERVERS = 1; 061 062 // Option names. 063 private static final String DATAGEN_KEY = "datagen"; 064 private static final String ITERATIONS_KEY = "iters"; 065 private static final String PRELOAD_COUNT_KEY = "pwk"; 066 private static final String WRITE_COUNT_KEY = "wk"; 067 private static final String WRITE_THREADS_KEY = "wt"; 068 private static final String READ_THREADS_KEY = "rt"; 069 private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes"; 070 private static final String SPLIT_SIZE_KEY = "splitsize"; 071 private static final String SPLIT_PARTS_KEY = "splitparts"; 072 private static final String VALUE_SIZE_KEY = "valsize"; 073 private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards"; 074 075 // Option values. 076 private LoadTestDataGenerator dataGen; 077 private int iterationCount; 078 private long preloadKeys; 079 private long writeKeys; 080 private int writeThreads; 081 private int readThreads; 082 private Long initialStripeCount; 083 private Long splitSize; 084 private Long splitParts; 085 086 private static final String VALUE_SIZE_DEFAULT = "512:4096"; 087 088 protected IntegrationTestingUtility util = new IntegrationTestingUtility(); 089 090 @Override 091 protected void addOptions() { 092 addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)"); 093 addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many" 094 + " sequences. The number of such shards per server is specified (default is 1)."); 095 addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare"); 096 addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server"); 097 addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server"); 098 addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing"); 099 addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading"); 100 addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially"); 101 addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes"); 102 addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits"); 103 addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;" 104 + " default " + VALUE_SIZE_DEFAULT); 105 } 106 107 @Override 108 protected void processOptions(CommandLine cmd) { 109 int minValueSize = 0, maxValueSize = 0; 110 String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT); 111 if (valueSize.contains(":")) { 112 List<String> valueSizes = Splitter.on(':').splitToList(valueSize); 113 if (valueSizes.size() != 2) { 114 throw new RuntimeException("Invalid value size: " + valueSize); 115 } 116 minValueSize = Integer.parseInt(Iterables.get(valueSizes, 0)); 117 maxValueSize = Integer.parseInt(Iterables.get(valueSizes, 1)); 118 } else { 119 minValueSize = maxValueSize = Integer.parseInt(valueSize); 120 } 121 String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase(Locale.ROOT); 122 if ("default".equals(datagen)) { 123 dataGen = new MultiThreadedAction.DefaultDataGenerator(minValueSize, maxValueSize, 1, 1, 124 new byte[][] { COLUMN_FAMILY }); 125 } else if ("sequential".equals(datagen)) { 126 int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1")); 127 dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards); 128 } else { 129 throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen); 130 } 131 iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1")); 132 preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000")); 133 writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000")); 134 writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10")); 135 readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20")); 136 initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY); 137 splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY); 138 splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY); 139 } 140 141 private Long getLongOrNull(CommandLine cmd, String option) { 142 if (!cmd.hasOption(option)) return null; 143 return Long.parseLong(cmd.getOptionValue(option)); 144 } 145 146 @Override 147 public Configuration getConf() { 148 Configuration c = super.getConf(); 149 if (c == null && util != null) { 150 conf = util.getConfiguration(); 151 c = conf; 152 } 153 return c; 154 } 155 156 @Override 157 protected int doWork() throws Exception { 158 setUp(); 159 try { 160 boolean isStripe = true; 161 for (int i = 0; i < iterationCount * 2; ++i) { 162 createTable(isStripe); 163 runOneTest((isStripe ? "Stripe" : "Default") + i, conf); 164 isStripe = !isStripe; 165 } 166 return 0; 167 } finally { 168 tearDown(); 169 } 170 } 171 172 private void setUp() throws Exception { 173 this.util = new IntegrationTestingUtility(); 174 LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers"); 175 util.initializeCluster(MIN_NUM_SERVERS); 176 LOG.debug("Done initializing/checking cluster"); 177 } 178 179 protected void deleteTable() throws Exception { 180 if (util.getAdmin().tableExists(TABLE_NAME)) { 181 LOG.info("Deleting table"); 182 if (!util.getAdmin().isTableDisabled(TABLE_NAME)) { 183 util.getAdmin().disableTable(TABLE_NAME); 184 } 185 util.getAdmin().deleteTable(TABLE_NAME); 186 LOG.info("Deleted table"); 187 } 188 } 189 190 private void createTable(boolean isStripe) throws Exception { 191 createTable(createHtd(isStripe)); 192 } 193 194 private void tearDown() throws Exception { 195 deleteTable(); 196 LOG.info("Restoring the cluster"); 197 util.restoreCluster(); 198 LOG.info("Done restoring the cluster"); 199 } 200 201 private void runOneTest(String description, Configuration conf) throws Exception { 202 int numServers = 203 util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size(); 204 long startKey = preloadKeys * numServers; 205 long endKey = startKey + writeKeys * numServers; 206 status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d", 207 description, numServers, startKey, endKey)); 208 209 if (preloadKeys > 0) { 210 MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); 211 long time = EnvironmentEdgeManager.currentTime(); 212 preloader.start(0, startKey, writeThreads); 213 preloader.waitForFinish(); 214 if (preloader.getNumWriteFailures() > 0) { 215 throw new IOException("Preload failed"); 216 } 217 int waitTime = (int) Math.min(preloadKeys / 100, 30000); // arbitrary 218 status(description + " preload took " + (EnvironmentEdgeManager.currentTime() - time) / 1000 219 + "sec; sleeping for " + waitTime / 1000 + "sec for store to stabilize"); 220 Thread.sleep(waitTime); 221 } 222 223 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); 224 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100); 225 // reader.getMetrics().enable(); 226 reader.linkToWriter(writer); 227 228 long testStartTime = EnvironmentEdgeManager.currentTime(); 229 writer.start(startKey, endKey, writeThreads); 230 reader.start(startKey, endKey, readThreads); 231 writer.waitForFinish(); 232 reader.waitForFinish(); 233 // reader.waitForVerification(300000); 234 // reader.abortAndWaitForFinish(); 235 status("Readers and writers stopped for test " + description); 236 237 boolean success = writer.getNumWriteFailures() == 0; 238 if (!success) { 239 LOG.error("Write failed"); 240 } else { 241 success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0; 242 if (!success) { 243 LOG.error("Read failed"); 244 } 245 } 246 247 // Dump perf regardless of the result. 248 /* 249 * StringBuilder perfDump = new StringBuilder(); for (Pair<Long, Long> pt : 250 * reader.getMetrics().getCombinedCdf()) { perfDump.append(String.format( "csvread,%s,%d,%d%n", 251 * description, pt.getFirst(), pt.getSecond())); } if (dumpTimePerf) { Iterator<Triple<Long, 252 * Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); while 253 * (timePerf.hasNext()) { Triple<Long, Double, Long> pt = timePerf.next(); 254 * perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", description, pt.getFirst(), 255 * pt.getThird(), pt.getSecond())); } } LOG.info("Performance data dump for " + description + 256 * " test: \n" + perfDump.toString()); 257 */ 258 status(description + " test took " 259 + (EnvironmentEdgeManager.currentTime() - testStartTime) / 1000 + "sec"); 260 Assert.assertTrue(success); 261 } 262 263 private static void status(String s) { 264 LOG.info("STATUS " + s); 265 System.out.println(s); 266 } 267 268 private HTableDescriptor createHtd(boolean isStripe) throws Exception { 269 HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); 270 htd.addFamily(new HColumnDescriptor(COLUMN_FAMILY)); 271 String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName(); 272 htd.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy); 273 if (isStripe) { 274 htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); 275 if (initialStripeCount != null) { 276 htd.setConfiguration(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 277 initialStripeCount.toString()); 278 htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, 279 Long.toString(10 * initialStripeCount)); 280 } else { 281 htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "500"); 282 } 283 if (splitSize != null) { 284 htd.setConfiguration(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString()); 285 } 286 if (splitParts != null) { 287 htd.setConfiguration(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString()); 288 } 289 } else { 290 htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "10"); // default 291 } 292 return htd; 293 } 294 295 protected void createTable(HTableDescriptor htd) throws Exception { 296 deleteTable(); 297 if (util.getHBaseClusterInterface() instanceof MiniHBaseCluster) { 298 LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low."); 299 htd.setConfiguration(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576"); 300 } 301 byte[][] splits = new RegionSplitter.HexStringSplit() 302 .split(util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size()); 303 util.getAdmin().createTable(htd, splits); 304 } 305 306 public static class SeqShardedDataGenerator extends LoadTestDataGenerator { 307 private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") }; 308 private static final int PAD_TO = 10; 309 private static final int PREFIX_PAD_TO = 7; 310 311 private final int numPartitions; 312 313 public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) { 314 super(minValueSize, maxValueSize); 315 this.numPartitions = numPartitions; 316 } 317 318 @Override 319 public byte[] getDeterministicUniqueKey(long keyBase) { 320 String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0"); 321 return Bytes.toBytes(getPrefix(keyBase) + num); 322 } 323 324 private String getPrefix(long i) { 325 return StringUtils.leftPad(String.valueOf((int) (i % numPartitions)), PREFIX_PAD_TO, "0"); 326 } 327 328 @Override 329 public byte[][] getColumnFamilies() { 330 return new byte[][] { COLUMN_FAMILY }; 331 } 332 333 @Override 334 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { 335 return COLUMN_NAMES; 336 } 337 338 @Override 339 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { 340 return kvGenerator.generateRandomSizeValue(rowKey, cf, column); 341 } 342 343 @Override 344 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { 345 return LoadTestKVGenerator.verify(value, rowKey, cf, column); 346 } 347 348 @Override 349 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) { 350 return true; 351 } 352 }; 353}