001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.test; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.concurrent.Executors; 023import java.util.concurrent.ScheduledExecutorService; 024import java.util.concurrent.ScheduledFuture; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HRegionLocation; 031import org.apache.hadoop.hbase.IntegrationTestIngest; 032import org.apache.hadoop.hbase.IntegrationTestingUtility; 033import org.apache.hadoop.hbase.RegionLocations; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.ClusterConnection; 038import org.apache.hadoop.hbase.client.Consistency; 039import org.apache.hadoop.hbase.client.Get; 040import org.apache.hadoop.hbase.client.Result; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; 043import org.apache.hadoop.hbase.testclassification.IntegrationTests; 044import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 045import org.apache.hadoop.hbase.util.LoadTestTool; 046import org.apache.hadoop.hbase.util.MultiThreadedReader; 047import org.apache.hadoop.hbase.util.Threads; 048import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator; 049import org.apache.hadoop.util.StringUtils; 050import org.apache.hadoop.util.ToolRunner; 051import org.junit.Assert; 052import org.junit.experimental.categories.Category; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 057 058/** 059 * An IntegrationTest for doing reads with a timeout, to a read-only table with region replicas. 060 * ChaosMonkey is run which kills the region servers and master, but ensures that meta region server 061 * is not killed, and at most 2 region servers are dead at any point in time. The expected behavior 062 * is that all reads with stale mode true will return before the timeout (5 sec by default). The 063 * test fails if the read requests does not finish in time. 064 * <p> 065 * This test uses LoadTestTool to read and write the data from a single client but multiple threads. 066 * The data is written first, then we allow the region replicas to catch up. Then we start the 067 * reader threads doing get requests with stale mode true. Chaos Monkey is started after some delay 068 * (20 sec by default) after the reader threads are started so that there is enough time to fully 069 * cache meta. These parameters (and some other parameters from LoadTestTool) can be used to control 070 * behavior, given values are default: 071 * 072 * <pre> 073 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000 074 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5 075 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.get_timeout_ms=5000 076 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_keys_per_server=2500 077 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.region_replication=3 078 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=20 079 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=20 080 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_regions_per_server=5 081 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.chaos_monkey_delay=20000 082 * </pre> 083 * 084 * Use this test with "serverKilling" ChaosMonkey. Sample usage: 085 * 086 * <pre> 087 * hbase org.apache.hadoop.hbase.test.IntegrationTestTimeBoundedRequestsWithRegionReplicas 088 * -Dhbase.IntegrationTestTimeBoundedRequestsWithRegionReplicas.runtime=600000 089 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_write_threads=40 090 * -DIntegrationTestTimeBoundedRequestsWithRegionReplicas.num_read_threads=40 091 * -Dhbase.ipc.client.allowsInterrupt=true --monkey serverKilling 092 * </pre> 093 */ 094@Category(IntegrationTests.class) 095public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends IntegrationTestIngest { 096 097 private static final Logger LOG = 098 LoggerFactory.getLogger(IntegrationTestTimeBoundedRequestsWithRegionReplicas.class); 099 100 private static final String TEST_NAME = 101 IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName(); 102 103 protected static final long DEFAULT_GET_TIMEOUT = 5000; // 5 sec 104 protected static final String GET_TIMEOUT_KEY = "get_timeout_ms"; 105 106 protected static final long DEFAUL_CHAOS_MONKEY_DELAY = 20 * 1000; // 20 sec 107 protected static final String CHAOS_MONKEY_DELAY_KEY = "chaos_monkey_delay"; 108 109 protected static final int DEFAULT_REGION_REPLICATION = 3; 110 111 @Override 112 protected void startMonkey() throws Exception { 113 // we do not want to start the monkey at the start of the test. 114 } 115 116 @Override 117 protected MonkeyFactory getDefaultMonkeyFactory() { 118 return MonkeyFactory.getFactory(MonkeyFactory.CALM); 119 } 120 121 @Override 122 public void setConf(Configuration conf) { 123 super.setConf(conf); 124 // default replication for this test is 3 125 String clazz = this.getClass().getSimpleName(); 126 conf.setIfUnset(String.format("%s.%s", clazz, LoadTestTool.OPT_REGION_REPLICATION), 127 Integer.toString(DEFAULT_REGION_REPLICATION)); 128 } 129 130 protected void writeData(int colsPerKey, int recordSize, int writeThreads, long startKey, 131 long numKeys) throws IOException { 132 int ret = loadTool.run(getArgsForLoadTestTool("-write", 133 String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys)); 134 if (0 != ret) { 135 String errorMsg = "Load failed with error code " + ret; 136 LOG.error(errorMsg); 137 Assert.fail(errorMsg); 138 } 139 } 140 141 @Override 142 protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey, 143 int recordSize, int writeThreads, int readThreads) throws Exception { 144 LOG.info("Cluster size:" 145 + util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size()); 146 147 long start = EnvironmentEdgeManager.currentTime(); 148 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 149 long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime); 150 long startKey = 0; 151 152 long numKeys = getNumKeys(keysPerServerPerIter); 153 154 // write data once 155 LOG.info("Writing some data to the table"); 156 writeData(colsPerKey, recordSize, writeThreads, startKey, numKeys); 157 158 // flush the table 159 LOG.info("Flushing the table"); 160 Admin admin = util.getAdmin(); 161 admin.flush(getTablename()); 162 163 // re-open the regions to make sure that the replicas are up to date 164 long refreshTime = 165 conf.getLong(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0); 166 if (refreshTime > 0 && refreshTime <= 10000) { 167 LOG.info("Sleeping " + refreshTime + "ms to ensure that the data is replicated"); 168 Threads.sleep(refreshTime * 3); 169 } else { 170 LOG.info("Reopening the table"); 171 admin.disableTable(getTablename()); 172 admin.enableTable(getTablename()); 173 } 174 175 // We should only start the ChaosMonkey after the readers are started and have cached 176 // all of the region locations. Because the meta is not replicated, the timebounded reads 177 // will timeout if meta server is killed. 178 // We will start the chaos monkey after 1 minute, and since the readers are reading random 179 // keys, it should be enough to cache every region entry. 180 long chaosMonkeyDelay = conf.getLong(String.format("%s.%s", TEST_NAME, CHAOS_MONKEY_DELAY_KEY), 181 DEFAUL_CHAOS_MONKEY_DELAY); 182 ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); 183 LOG.info( 184 String.format("ChaosMonkey delay is : %d seconds. Will start %s " + "ChaosMonkey after delay", 185 chaosMonkeyDelay / 1000, monkeyToUse)); 186 ScheduledFuture<?> result = executorService.schedule(new Runnable() { 187 @Override 188 public void run() { 189 try { 190 LOG.info("Starting ChaosMonkey"); 191 monkey.start(); 192 monkey.waitForStop(); 193 } catch (Exception e) { 194 LOG.warn(StringUtils.stringifyException(e)); 195 } 196 197 } 198 }, chaosMonkeyDelay, TimeUnit.MILLISECONDS); 199 200 // set the intended run time for the reader. The reader will do read requests 201 // to random keys for this amount of time. 202 long remainingTime = runtime - (EnvironmentEdgeManager.currentTime() - start); 203 if (remainingTime <= 0) { 204 LOG.error("The amount of time left for the test to perform random reads is " 205 + "non-positive. Increase the test execution time via " 206 + String.format(RUN_TIME_KEY, 207 IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName()) 208 + " or reduce the amount of data written per server via " 209 + IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName() + "." 210 + IntegrationTestIngest.NUM_KEYS_PER_SERVER_KEY); 211 throw new IllegalArgumentException("No time remains to execute random reads"); 212 } 213 LOG.info("Reading random keys from the table for " + remainingTime / 60000 + " min"); 214 this.conf.setLong( 215 String.format(RUN_TIME_KEY, TimeBoundedMultiThreadedReader.class.getSimpleName()), 216 remainingTime); // load tool shares the same conf 217 218 // now start the readers which will run for configured run time 219 try { 220 int ret = loadTool.run( 221 getArgsForLoadTestTool("-read", String.format("100:%d", readThreads), startKey, numKeys)); 222 if (0 != ret) { 223 String errorMsg = "Verification failed with error code " + ret; 224 LOG.error(errorMsg); 225 Assert.fail(errorMsg); 226 } 227 } finally { 228 if (result != null) result.cancel(false); 229 monkey.stop("Stopping the test"); 230 monkey.waitForStop(); 231 executorService.shutdown(); 232 } 233 } 234 235 @Override 236 protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey, 237 long numKeys) { 238 List<String> args = 239 Lists.newArrayList(super.getArgsForLoadTestTool(mode, modeSpecificArg, startKey, numKeys)); 240 args.add("-reader"); 241 args.add(TimeBoundedMultiThreadedReader.class.getName()); 242 return args.toArray(new String[args.size()]); 243 } 244 245 public static class TimeBoundedMultiThreadedReader extends MultiThreadedReader { 246 protected long timeoutNano; 247 protected AtomicLong timedOutReads = new AtomicLong(); 248 protected long runTime; 249 protected Thread timeoutThread; 250 protected AtomicLong staleReads = new AtomicLong(); 251 252 public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, 253 TableName tableName, double verifyPercent) throws IOException { 254 super(dataGen, conf, tableName, verifyPercent); 255 long timeoutMs = 256 conf.getLong(String.format("%s.%s", TEST_NAME, GET_TIMEOUT_KEY), DEFAULT_GET_TIMEOUT); 257 timeoutNano = timeoutMs * 1000000; 258 LOG.info("Timeout for gets: " + timeoutMs); 259 String runTimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 260 this.runTime = conf.getLong(runTimeKey, -1); 261 if (this.runTime <= 0) { 262 throw new IllegalArgumentException("Please configure " + runTimeKey); 263 } 264 } 265 266 @Override 267 public void waitForFinish() { 268 try { 269 this.timeoutThread.join(); 270 } catch (InterruptedException e) { 271 // Restore interrupt status 272 Thread.currentThread().interrupt(); 273 } 274 this.aborted = true; 275 super.waitForFinish(); 276 } 277 278 @Override 279 protected String progressInfo() { 280 StringBuilder builder = new StringBuilder(super.progressInfo()); 281 appendToStatus(builder, "stale_reads", staleReads.get()); 282 appendToStatus(builder, "get_timeouts", timedOutReads.get()); 283 return builder.toString(); 284 } 285 286 @Override 287 public void start(long startKey, long endKey, int numThreads) throws IOException { 288 super.start(startKey, endKey, numThreads); 289 this.timeoutThread = new TimeoutThread(this.runTime); 290 this.timeoutThread.start(); 291 } 292 293 @Override 294 protected HBaseReaderThread createReaderThread(int readerId) throws IOException { 295 return new TimeBoundedMultiThreadedReaderThread(readerId); 296 } 297 298 private static class TimeoutThread extends Thread { 299 long timeout; 300 long reportInterval = 60000; 301 302 public TimeoutThread(long timeout) { 303 this.timeout = timeout; 304 } 305 306 @Override 307 public void run() { 308 while (true) { 309 long rem = Math.min(timeout, reportInterval); 310 if (rem <= 0) { 311 break; 312 } 313 LOG.info("Remaining execution time:" + timeout / 60000 + " min"); 314 Threads.sleep(rem); 315 timeout -= rem; 316 } 317 } 318 } 319 320 public class TimeBoundedMultiThreadedReaderThread 321 extends MultiThreadedReader.HBaseReaderThread { 322 323 public TimeBoundedMultiThreadedReaderThread(int readerId) throws IOException { 324 super(readerId); 325 } 326 327 @Override 328 protected Get createGet(long keyToRead) throws IOException { 329 Get get = super.createGet(keyToRead); 330 get.setConsistency(Consistency.TIMELINE); 331 return get; 332 } 333 334 @Override 335 protected long getNextKeyToRead() { 336 // always read a random key, assuming that the writer has finished writing all keys 337 long key = 338 startKey + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE) % (endKey - startKey); 339 return key; 340 } 341 342 @Override 343 protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano, 344 Result[] results, Table table, boolean isNullExpected) throws IOException { 345 super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, 346 isNullExpected); 347 for (Result r : results) { 348 if (r.isStale()) staleReads.incrementAndGet(); 349 } 350 // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC 351 // to complete, but if the request took longer than timeout, we treat that as error. 352 if (elapsedNano > timeoutNano) { 353 timedOutReads.incrementAndGet(); 354 numReadFailures.addAndGet(1); // fail the test 355 for (Result r : results) { 356 LOG.error("FAILED FOR " + r); 357 RegionLocations rl = 358 ((ClusterConnection) connection).locateRegion(tableName, r.getRow(), true, true); 359 HRegionLocation locations[] = rl.getRegionLocations(); 360 for (HRegionLocation h : locations) { 361 LOG.error("LOCATION " + h); 362 } 363 } 364 } 365 } 366 } 367 } 368 369 public static void main(String[] args) throws Exception { 370 Configuration conf = HBaseConfiguration.create(); 371 IntegrationTestingUtility.setUseDistributedCluster(conf); 372 int ret = 373 ToolRunner.run(conf, new IntegrationTestTimeBoundedRequestsWithRegionReplicas(), args); 374 System.exit(ret); 375 } 376}