001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.test; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.concurrent.Executors; 024import java.util.concurrent.ScheduledExecutorService; 025import java.util.concurrent.ScheduledFuture; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.commons.lang3.RandomUtils; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.HRegionLocation; 032import org.apache.hadoop.hbase.IntegrationTestIngest; 033import org.apache.hadoop.hbase.IntegrationTestingUtility; 034import org.apache.hadoop.hbase.RegionLocations; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.ClusterConnection; 039import org.apache.hadoop.hbase.client.Consistency; 040import org.apache.hadoop.hbase.client.Get; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 044import org.apache.hadoop.hbase.testclassification.IntegrationTests; 045import org.apache.hadoop.hbase.util.LoadTestTool; 046import org.apache.hadoop.hbase.util.MultiThreadedReader; 047import org.apache.hadoop.hbase.util.Threads; 048import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 049import org.apache.hadoop.util.StringUtils; 050import org.apache.hadoop.util.ToolRunner; 051import org.junit.Assert; 052import org.junit.experimental.categories.Category; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 056 057/** 058 * An IntegrationTest for doing reads with a timeout, to a read-only table with region 059 * replicas. ChaosMonkey is run which kills the region servers and master, but ensures 060 * that meta region server is not killed, and at most 2 region servers are dead at any point 061 * in time. The expected behavior is that all reads with stale mode true will return 062 * before the timeout (5 sec by default). The test fails if the read requests does not finish 063 * in time. 064 * 065 * <p> This test uses LoadTestTool to read and write the data from a single client but 066 * multiple threads. The data is written first, then we allow the region replicas to catch 067 * up. Then we start the reader threads doing get requests with stale mode true. Chaos Monkey is 068 * started after some delay (20 sec by default) after the reader threads are started so that 069 * there is enough time to fully cache meta. 070 * 071 * These parameters (and some other parameters from LoadTestTool) can be used to 072 * control behavior, given values are default: 073 * <pre> 074 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000 075 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5 076 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.get_timeout_ms=5000 077 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_keys_per_server=2500 078 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.region_replication=3 079 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=20 080 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=20 081 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5 082 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.chaos_monkey_delay=20000 083 * </pre> 084 * Use this test with "serverKilling" ChaosMonkey. Sample usage: 085 * <pre> 086 * hbase org.apache.hadoop.hbase.test.IntegrationTestTimeBoundedRequestsWithRegionReplicas 087 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000 088 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=40 089 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=40 090 * -Dhbase.ipc.client.allowsInterrupt=true --monkey serverKilling 091 * </pre> 092 */ 093@Category(IntegrationTests.class) 094public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends IntegrationTestIngest { 095 096 private static final Logger LOG = LoggerFactory.getLogger( 097 IntegrationTestTimeBoundedRequestsWithRegionReplicas.class); 098 099 private static final String TEST_NAME 100 = IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName(); 101 102 protected static final long DEFAULT_GET_TIMEOUT = 5000; // 5 sec 103 protected static final String GET_TIMEOUT_KEY = "get_timeout_ms"; 104 105 protected static final long DEFAUL_CHAOS_MONKEY_DELAY = 20 * 1000; // 20 sec 106 protected static final String CHAOS_MONKEY_DELAY_KEY = "chaos_monkey_delay"; 107 108 protected static final int DEFAULT_REGION_REPLICATION = 3; 109 110 @Override 111 protected void startMonkey() throws Exception { 112 // we do not want to start the monkey at the start of the test. 113 } 114 115 @Override 116 protected MonkeyFactory getDefaultMonkeyFactory() { 117 return MonkeyFactory.getFactory(MonkeyFactory.CALM); 118 } 119 120 @Override 121 public void setConf(Configuration conf) { 122 super.setConf(conf); 123 // default replication for this test is 3 124 String clazz = this.getClass().getSimpleName(); 125 conf.setIfUnset(String.format("%s.%s", clazz, LoadTestTool.OPT_REGION_REPLICATION), 126 Integer.toString(DEFAULT_REGION_REPLICATION)); 127 } 128 129 protected void writeData(int colsPerKey, int recordSize, int writeThreads, 130 long startKey, long numKeys) throws IOException { 131 int ret = loadTool.run(getArgsForLoadTestTool("-write", 132 String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys)); 133 if (0 != ret) { 134 String errorMsg = "Load failed with error code " + ret; 135 LOG.error(errorMsg); 136 Assert.fail(errorMsg); 137 } 138 } 139 140 @Override 141 protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey, 142 int recordSize, int writeThreads, int readThreads) throws Exception { 143 LOG.info("Cluster size:"+ 144 util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size()); 145 146 long start = System.currentTimeMillis(); 147 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 148 long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime); 149 long startKey = 0; 150 151 long numKeys = getNumKeys(keysPerServerPerIter); 152 153 154 // write data once 155 LOG.info("Writing some data to the table"); 156 writeData(colsPerKey, recordSize, writeThreads, startKey, numKeys); 157 158 // flush the table 159 LOG.info("Flushing the table"); 160 Admin admin = util.getAdmin(); 161 admin.flush(getTablename()); 162 163 // re-open the regions to make sure that the replicas are up to date 164 long refreshTime = conf.getLong(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0); 165 if (refreshTime > 0 && refreshTime <= 10000) { 166 LOG.info("Sleeping " + refreshTime + "ms to ensure that the data is replicated"); 167 Threads.sleep(refreshTime*3); 168 } else { 169 LOG.info("Reopening the table"); 170 admin.disableTable(getTablename()); 171 admin.enableTable(getTablename()); 172 } 173 174 // We should only start the ChaosMonkey after the readers are started and have cached 175 // all of the region locations. Because the meta is not replicated, the timebounded reads 176 // will timeout if meta server is killed. 177 // We will start the chaos monkey after 1 minute, and since the readers are reading random 178 // keys, it should be enough to cache every region entry. 179 long chaosMonkeyDelay = conf.getLong(String.format("%s.%s", TEST_NAME, CHAOS_MONKEY_DELAY_KEY) 180 , DEFAUL_CHAOS_MONKEY_DELAY); 181 ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); 182 LOG.info(String.format("ChaosMonkey delay is : %d seconds. Will start %s " + 183 "ChaosMonkey after delay", chaosMonkeyDelay / 1000, monkeyToUse)); 184 ScheduledFuture<?> result = executorService.schedule(new Runnable() { 185 @Override 186 public void run() { 187 try { 188 LOG.info("Starting ChaosMonkey"); 189 monkey.start(); 190 monkey.waitForStop(); 191 } catch (Exception e) { 192 LOG.warn(StringUtils.stringifyException(e)); 193 } 194 195 } 196 }, chaosMonkeyDelay, TimeUnit.MILLISECONDS); 197 198 // set the intended run time for the reader. The reader will do read requests 199 // to random keys for this amount of time. 200 long remainingTime = runtime - (System.currentTimeMillis() - start); 201 if (remainingTime <= 0) { 202 LOG.error("The amount of time left for the test to perform random reads is " 203 + "non-positive. Increase the test execution time via " 204 + String.format(RUN_TIME_KEY, 205 IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName()) 206 + " or reduce the amount of data written per server via " 207 + IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName() 208 + "." + IntegrationTestIngest.NUM_KEYS_PER_SERVER_KEY); 209 throw new IllegalArgumentException("No time remains to execute random reads"); 210 } 211 LOG.info("Reading random keys from the table for " + remainingTime/60000 + " min"); 212 this.conf.setLong( 213 String.format(RUN_TIME_KEY, TimeBoundedMultiThreadedReader.class.getSimpleName()) 214 , remainingTime); // load tool shares the same conf 215 216 // now start the readers which will run for configured run time 217 try { 218 int ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads) 219 , startKey, numKeys)); 220 if (0 != ret) { 221 String errorMsg = "Verification failed with error code " + ret; 222 LOG.error(errorMsg); 223 Assert.fail(errorMsg); 224 } 225 } finally { 226 if (result != null) result.cancel(false); 227 monkey.stop("Stopping the test"); 228 monkey.waitForStop(); 229 executorService.shutdown(); 230 } 231 } 232 233 @Override 234 protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, 235 long numKeys) { 236 List<String> args = Lists.newArrayList(super.getArgsForLoadTestTool( 237 mode, modeSpecificArg, startKey, numKeys)); 238 args.add("-reader"); 239 args.add(TimeBoundedMultiThreadedReader.class.getName()); 240 return args.toArray(new String[args.size()]); 241 } 242 243 public static class TimeBoundedMultiThreadedReader extends MultiThreadedReader { 244 protected long timeoutNano; 245 protected AtomicLong timedOutReads = new AtomicLong(); 246 protected long runTime; 247 protected Thread timeoutThread; 248 protected AtomicLong staleReads = new AtomicLong(); 249 250 public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, 251 TableName tableName, double verifyPercent) throws IOException { 252 super(dataGen, conf, tableName, verifyPercent); 253 long timeoutMs = conf.getLong( 254 String.format("%s.%s", TEST_NAME, GET_TIMEOUT_KEY), DEFAULT_GET_TIMEOUT); 255 timeoutNano = timeoutMs * 1000000; 256 LOG.info("Timeout for gets: " + timeoutMs); 257 String runTimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 258 this.runTime = conf.getLong(runTimeKey, -1); 259 if (this.runTime <= 0) { 260 throw new IllegalArgumentException("Please configure " + runTimeKey); 261 } 262 } 263 264 @Override 265 public void waitForFinish() { 266 try { 267 this.timeoutThread.join(); 268 } catch (InterruptedException e) { 269 e.printStackTrace(); 270 } 271 this.aborted = true; 272 super.waitForFinish(); 273 } 274 275 @Override 276 protected String progressInfo() { 277 StringBuilder builder = new StringBuilder(super.progressInfo()); 278 appendToStatus(builder, "stale_reads", staleReads.get()); 279 appendToStatus(builder, "get_timeouts", timedOutReads.get()); 280 return builder.toString(); 281 } 282 283 @Override 284 public void start(long startKey, long endKey, int numThreads) throws IOException { 285 super.start(startKey, endKey, numThreads); 286 this.timeoutThread = new TimeoutThread(this.runTime); 287 this.timeoutThread.start(); 288 } 289 290 @Override 291 protected HBaseReaderThread createReaderThread(int readerId) throws IOException { 292 return new TimeBoundedMultiThreadedReaderThread(readerId); 293 } 294 295 private class TimeoutThread extends Thread { 296 long timeout; 297 long reportInterval = 60000; 298 public TimeoutThread(long timeout) { 299 this.timeout = timeout; 300 } 301 302 @Override 303 public void run() { 304 while (true) { 305 long rem = Math.min(timeout, reportInterval); 306 if (rem <= 0) { 307 break; 308 } 309 LOG.info("Remaining execution time:" + timeout / 60000 + " min"); 310 Threads.sleep(rem); 311 timeout -= rem; 312 } 313 } 314 } 315 316 public class TimeBoundedMultiThreadedReaderThread 317 extends MultiThreadedReader.HBaseReaderThread { 318 319 public TimeBoundedMultiThreadedReaderThread(int readerId) throws IOException { 320 super(readerId); 321 } 322 323 @Override 324 protected Get createGet(long keyToRead) throws IOException { 325 Get get = super.createGet(keyToRead); 326 get.setConsistency(Consistency.TIMELINE); 327 return get; 328 } 329 330 @Override 331 protected long getNextKeyToRead() { 332 // always read a random key, assuming that the writer has finished writing all keys 333 long key = startKey + Math.abs(RandomUtils.nextLong()) 334 % (endKey - startKey); 335 return key; 336 } 337 338 @Override 339 protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, 340 Result[] results, Table table, boolean isNullExpected) 341 throws IOException { 342 super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, isNullExpected); 343 for (Result r : results) { 344 if (r.isStale()) staleReads.incrementAndGet(); 345 } 346 // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC 347 // to complete, but if the request took longer than timeout, we treat that as error. 348 if (elapsedNano > timeoutNano) { 349 timedOutReads.incrementAndGet(); 350 numReadFailures.addAndGet(1); // fail the test 351 for (Result r : results) { 352 LOG.error("FAILED FOR " + r); 353 RegionLocations rl = ((ClusterConnection)connection). 354 locateRegion(tableName, r.getRow(), true, true); 355 HRegionLocation locations[] = rl.getRegionLocations(); 356 for (HRegionLocation h : locations) { 357 LOG.error("LOCATION " + h); 358 } 359 } 360 } 361 } 362 } 363 } 364 365 public static void main(String[] args) throws Exception { 366 Configuration conf = HBaseConfiguration.create(); 367 IntegrationTestingUtility.setUseDistributedCluster(conf); 368 int ret = ToolRunner.run(conf, new IntegrationTestTimeBoundedRequestsWithRegionReplicas(), args); 369 System.exit(ret); 370 } 371}