001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.util.Locale; 023import java.util.Set; 024import org.apache.commons.lang3.StringUtils; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 027import org.apache.hadoop.hbase.regionserver.HStore; 028import org.apache.hadoop.hbase.regionserver.StoreEngine; 029import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; 030import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; 031import org.apache.hadoop.hbase.util.AbstractHBaseTool; 032import org.apache.hadoop.hbase.util.Bytes; 033import org.apache.hadoop.hbase.util.LoadTestKVGenerator; 034import org.apache.hadoop.hbase.util.MultiThreadedAction; 035import org.apache.hadoop.hbase.util.MultiThreadedReader; 036import org.apache.hadoop.hbase.util.MultiThreadedWriter; 037import org.apache.hadoop.hbase.util.RegionSplitter; 038import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.junit.Assert; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 045 046/** 047 * A perf test which does large data ingestion using stripe compactions and regular compactions. 048 */ 049@InterfaceAudience.Private 050public class StripeCompactionsPerformanceEvaluation extends AbstractHBaseTool { 051 private static final Logger LOG = 052 LoggerFactory.getLogger(StripeCompactionsPerformanceEvaluation.class); 053 054 private static final TableName TABLE_NAME = 055 TableName.valueOf(StripeCompactionsPerformanceEvaluation.class.getSimpleName()); 056 private static final byte[] COLUMN_FAMILY = Bytes.toBytes("CF"); 057 private static final int MIN_NUM_SERVERS = 1; 058 059 // Option names. 060 private static final String DATAGEN_KEY = "datagen"; 061 private static final String ITERATIONS_KEY = "iters"; 062 private static final String PRELOAD_COUNT_KEY = "pwk"; 063 private static final String WRITE_COUNT_KEY = "wk"; 064 private static final String WRITE_THREADS_KEY = "wt"; 065 private static final String READ_THREADS_KEY = "rt"; 066 private static final String INITIAL_STRIPE_COUNT_KEY = "initstripes"; 067 private static final String SPLIT_SIZE_KEY = "splitsize"; 068 private static final String SPLIT_PARTS_KEY = "splitparts"; 069 private static final String VALUE_SIZE_KEY = "valsize"; 070 private static final String SEQ_SHARDS_PER_SERVER_KEY = "seqshards"; 071 072 // Option values. 073 private LoadTestDataGenerator dataGen; 074 private int iterationCount; 075 private long preloadKeys; 076 private long writeKeys; 077 private int writeThreads; 078 private int readThreads; 079 private Long initialStripeCount; 080 private Long splitSize; 081 private Long splitParts; 082 083 private static final String VALUE_SIZE_DEFAULT = "512:4096"; 084 085 protected IntegrationTestingUtility util = new IntegrationTestingUtility(); 086 087 @Override 088 protected void addOptions() { 089 addOptWithArg(DATAGEN_KEY, "Type of data generator to use (default or sequential)"); 090 addOptWithArg(SEQ_SHARDS_PER_SERVER_KEY, "Sequential generator will shard the data into many" 091 + " sequences. The number of such shards per server is specified (default is 1)."); 092 addOptWithArg(ITERATIONS_KEY, "Number of iterations to run to compare"); 093 addOptWithArg(PRELOAD_COUNT_KEY, "Number of keys to preload, per server"); 094 addOptWithArg(WRITE_COUNT_KEY, "Number of keys to write, per server"); 095 addOptWithArg(WRITE_THREADS_KEY, "Number of threads to use for writing"); 096 addOptWithArg(READ_THREADS_KEY, "Number of threads to use for reading"); 097 addOptWithArg(INITIAL_STRIPE_COUNT_KEY, "Number of stripes to split regions into initially"); 098 addOptWithArg(SPLIT_SIZE_KEY, "Size at which a stripe will split into more stripes"); 099 addOptWithArg(SPLIT_PARTS_KEY, "Number of stripes to split a stripe into when it splits"); 100 addOptWithArg(VALUE_SIZE_KEY, "Value size; either a number, or a colon-separated range;" 101 + " default " + VALUE_SIZE_DEFAULT); 102 } 103 104 @Override 105 protected void processOptions(CommandLine cmd) { 106 int minValueSize = 0, maxValueSize = 0; 107 String valueSize = cmd.getOptionValue(VALUE_SIZE_KEY, VALUE_SIZE_DEFAULT); 108 if (valueSize.contains(":")) { 109 String[] valueSizes = valueSize.split(":"); 110 if (valueSize.length() != 2) throw new RuntimeException("Invalid value size: " + valueSize); 111 minValueSize = Integer.parseInt(valueSizes[0]); 112 maxValueSize = Integer.parseInt(valueSizes[1]); 113 } else { 114 minValueSize = maxValueSize = Integer.parseInt(valueSize); 115 } 116 String datagen = cmd.getOptionValue(DATAGEN_KEY, "default").toLowerCase(Locale.ROOT); 117 if ("default".equals(datagen)) { 118 dataGen = new MultiThreadedAction.DefaultDataGenerator( 119 minValueSize, maxValueSize, 1, 1, new byte[][] { COLUMN_FAMILY }); 120 } else if ("sequential".equals(datagen)) { 121 int shards = Integer.parseInt(cmd.getOptionValue(SEQ_SHARDS_PER_SERVER_KEY, "1")); 122 dataGen = new SeqShardedDataGenerator(minValueSize, maxValueSize, shards); 123 } else { 124 throw new RuntimeException("Unknown " + DATAGEN_KEY + ": " + datagen); 125 } 126 iterationCount = Integer.parseInt(cmd.getOptionValue(ITERATIONS_KEY, "1")); 127 preloadKeys = Long.parseLong(cmd.getOptionValue(PRELOAD_COUNT_KEY, "1000000")); 128 writeKeys = Long.parseLong(cmd.getOptionValue(WRITE_COUNT_KEY, "1000000")); 129 writeThreads = Integer.parseInt(cmd.getOptionValue(WRITE_THREADS_KEY, "10")); 130 readThreads = Integer.parseInt(cmd.getOptionValue(READ_THREADS_KEY, "20")); 131 initialStripeCount = getLongOrNull(cmd, INITIAL_STRIPE_COUNT_KEY); 132 splitSize = getLongOrNull(cmd, SPLIT_SIZE_KEY); 133 splitParts = getLongOrNull(cmd, SPLIT_PARTS_KEY); 134 } 135 136 private Long getLongOrNull(CommandLine cmd, String option) { 137 if (!cmd.hasOption(option)) return null; 138 return Long.parseLong(cmd.getOptionValue(option)); 139 } 140 141 @Override 142 public Configuration getConf() { 143 Configuration c = super.getConf(); 144 if (c == null && util != null) { 145 conf = util.getConfiguration(); 146 c = conf; 147 } 148 return c; 149 } 150 151 @Override 152 protected int doWork() throws Exception { 153 setUp(); 154 try { 155 boolean isStripe = true; 156 for (int i = 0; i < iterationCount * 2; ++i) { 157 createTable(isStripe); 158 runOneTest((isStripe ? "Stripe" : "Default") + i, conf); 159 isStripe = !isStripe; 160 } 161 return 0; 162 } finally { 163 tearDown(); 164 } 165 } 166 167 168 private void setUp() throws Exception { 169 this.util = new IntegrationTestingUtility(); 170 LOG.debug("Initializing/checking cluster has " + MIN_NUM_SERVERS + " servers"); 171 util.initializeCluster(MIN_NUM_SERVERS); 172 LOG.debug("Done initializing/checking cluster"); 173 } 174 175 protected void deleteTable() throws Exception { 176 if (util.getAdmin().tableExists(TABLE_NAME)) { 177 LOG.info("Deleting table"); 178 if (!util.getAdmin().isTableDisabled(TABLE_NAME)) { 179 util.getAdmin().disableTable(TABLE_NAME); 180 } 181 util.getAdmin().deleteTable(TABLE_NAME); 182 LOG.info("Deleted table"); 183 } 184 } 185 186 private void createTable(boolean isStripe) throws Exception { 187 createTable(createHtd(isStripe)); 188 } 189 190 private void tearDown() throws Exception { 191 deleteTable(); 192 LOG.info("Restoring the cluster"); 193 util.restoreCluster(); 194 LOG.info("Done restoring the cluster"); 195 } 196 197 private void runOneTest(String description, Configuration conf) throws Exception { 198 int numServers = util.getHBaseClusterInterface() 199 .getClusterMetrics().getLiveServerMetrics().size(); 200 long startKey = preloadKeys * numServers; 201 long endKey = startKey + writeKeys * numServers; 202 status(String.format("%s test starting on %d servers; preloading 0 to %d and writing to %d", 203 description, numServers, startKey, endKey)); 204 205 if (preloadKeys > 0) { 206 MultiThreadedWriter preloader = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); 207 long time = System.currentTimeMillis(); 208 preloader.start(0, startKey, writeThreads); 209 preloader.waitForFinish(); 210 if (preloader.getNumWriteFailures() > 0) { 211 throw new IOException("Preload failed"); 212 } 213 int waitTime = (int)Math.min(preloadKeys / 100, 30000); // arbitrary 214 status(description + " preload took " + (System.currentTimeMillis()-time)/1000 215 + "sec; sleeping for " + waitTime/1000 + "sec for store to stabilize"); 216 Thread.sleep(waitTime); 217 } 218 219 MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, TABLE_NAME); 220 MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, TABLE_NAME, 100); 221 // reader.getMetrics().enable(); 222 reader.linkToWriter(writer); 223 224 long testStartTime = System.currentTimeMillis(); 225 writer.start(startKey, endKey, writeThreads); 226 reader.start(startKey, endKey, readThreads); 227 writer.waitForFinish(); 228 reader.waitForFinish(); 229 // reader.waitForVerification(300000); 230 // reader.abortAndWaitForFinish(); 231 status("Readers and writers stopped for test " + description); 232 233 boolean success = writer.getNumWriteFailures() == 0; 234 if (!success) { 235 LOG.error("Write failed"); 236 } else { 237 success = reader.getNumReadErrors() == 0 && reader.getNumReadFailures() == 0; 238 if (!success) { 239 LOG.error("Read failed"); 240 } 241 } 242 243 // Dump perf regardless of the result. 244 /*StringBuilder perfDump = new StringBuilder(); 245 for (Pair<Long, Long> pt : reader.getMetrics().getCombinedCdf()) { 246 perfDump.append(String.format( 247 "csvread,%s,%d,%d%n", description, pt.getFirst(), pt.getSecond())); 248 } 249 if (dumpTimePerf) { 250 Iterator<Triple<Long, Double, Long>> timePerf = reader.getMetrics().getCombinedTimeSeries(); 251 while (timePerf.hasNext()) { 252 Triple<Long, Double, Long> pt = timePerf.next(); 253 perfDump.append(String.format("csvtime,%s,%d,%d,%.4f%n", 254 description, pt.getFirst(), pt.getThird(), pt.getSecond())); 255 } 256 } 257 LOG.info("Performance data dump for " + description + " test: \n" + perfDump.toString());*/ 258 status(description + " test took " + (System.currentTimeMillis()-testStartTime)/1000 + "sec"); 259 Assert.assertTrue(success); 260 } 261 262 private static void status(String s) { 263 LOG.info("STATUS " + s); 264 System.out.println(s); 265 } 266 267 private HTableDescriptor createHtd(boolean isStripe) throws Exception { 268 HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); 269 htd.addFamily(new HColumnDescriptor(COLUMN_FAMILY)); 270 String noSplitsPolicy = DisabledRegionSplitPolicy.class.getName(); 271 htd.setConfiguration(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, noSplitsPolicy); 272 if (isStripe) { 273 htd.setConfiguration(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName()); 274 if (initialStripeCount != null) { 275 htd.setConfiguration( 276 StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, initialStripeCount.toString()); 277 htd.setConfiguration( 278 HStore.BLOCKING_STOREFILES_KEY, Long.toString(10 * initialStripeCount)); 279 } else { 280 htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "500"); 281 } 282 if (splitSize != null) { 283 htd.setConfiguration(StripeStoreConfig.SIZE_TO_SPLIT_KEY, splitSize.toString()); 284 } 285 if (splitParts != null) { 286 htd.setConfiguration(StripeStoreConfig.SPLIT_PARTS_KEY, splitParts.toString()); 287 } 288 } else { 289 htd.setConfiguration(HStore.BLOCKING_STOREFILES_KEY, "10"); // default 290 } 291 return htd; 292 } 293 294 protected void createTable(HTableDescriptor htd) throws Exception { 295 deleteTable(); 296 if (util.getHBaseClusterInterface() instanceof MiniHBaseCluster) { 297 LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low."); 298 htd.setConfiguration(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576"); 299 } 300 byte[][] splits = new RegionSplitter.HexStringSplit().split( 301 util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size()); 302 util.getAdmin().createTable(htd, splits); 303 } 304 305 public static class SeqShardedDataGenerator extends LoadTestDataGenerator { 306 private static final byte[][] COLUMN_NAMES = new byte[][] { Bytes.toBytes("col1") }; 307 private static final int PAD_TO = 10; 308 private static final int PREFIX_PAD_TO = 7; 309 310 private final int numPartitions; 311 312 public SeqShardedDataGenerator(int minValueSize, int maxValueSize, int numPartitions) { 313 super(minValueSize, maxValueSize); 314 this.numPartitions = numPartitions; 315 } 316 317 @Override 318 public byte[] getDeterministicUniqueKey(long keyBase) { 319 String num = StringUtils.leftPad(String.valueOf(keyBase), PAD_TO, "0"); 320 return Bytes.toBytes(getPrefix(keyBase) + num); 321 } 322 323 private String getPrefix(long i) { 324 return StringUtils.leftPad(String.valueOf((int)(i % numPartitions)), PREFIX_PAD_TO, "0"); 325 } 326 327 @Override 328 public byte[][] getColumnFamilies() { 329 return new byte[][] { COLUMN_FAMILY }; 330 } 331 332 @Override 333 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) { 334 return COLUMN_NAMES; 335 } 336 337 @Override 338 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) { 339 return kvGenerator.generateRandomSizeValue(rowKey, cf, column); 340 } 341 342 @Override 343 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) { 344 return LoadTestKVGenerator.verify(value, rowKey, cf, column); 345 } 346 347 @Override 348 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) { 349 return true; 350 } 351 }; 352}