001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Random; 027import java.util.concurrent.Callable; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HColumnDescriptor; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HTableDescriptor; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; 044import org.apache.hadoop.hbase.ipc.RpcExecutor; 045import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; 046import org.apache.hadoop.hbase.testclassification.ClientTests; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.LoadTestKVGenerator; 050import org.junit.After; 051import org.junit.AfterClass; 052import org.junit.Before; 053import org.junit.BeforeClass; 054import org.junit.ClassRule; 055import org.junit.Ignore; 056import org.junit.Rule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.rules.TestName; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063@Category({MediumTests.class, ClientTests.class}) 064public class TestFastFail { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestFastFail.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestFastFail.class); 071 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 072 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 073 private static final Random random = new Random(); 074 private static int SLAVES = 1; 075 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 076 private static final int SLEEPTIME = 5000; 077 078 @Rule 079 public TestName name = new TestName(); 080 081 /** 082 * @throws java.lang.Exception 083 */ 084 @BeforeClass 085 public static void setUpBeforeClass() throws Exception { 086 // Just to prevent fastpath FIFO from picking calls up bypassing the queue. 087 TEST_UTIL.getConfiguration().set( 088 RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, "deadline"); 089 TEST_UTIL.startMiniCluster(SLAVES); 090 } 091 092 /** 093 * @throws java.lang.Exception 094 */ 095 @AfterClass 096 public static void tearDownAfterClass() throws Exception { 097 TEST_UTIL.shutdownMiniCluster(); 098 } 099 100 /** 101 * @throws java.lang.Exception 102 */ 103 @Before 104 public void setUp() throws Exception { 105 MyPreemptiveFastFailInterceptor.numBraveSouls.set(0); 106 CallQueueTooBigPffeInterceptor.numCallQueueTooBig.set(0); 107 } 108 109 /** 110 * @throws java.lang.Exception 111 */ 112 @After 113 public void tearDown() throws Exception { 114 // Nothing to do. 115 } 116 117 @Ignore ("Can go zombie -- see HBASE-14421; FIX") @Test 118 public void testFastFail() throws IOException, InterruptedException { 119 Admin admin = TEST_UTIL.getAdmin(); 120 121 final String tableName = name.getMethodName(); 122 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes 123 .toBytes(tableName))); 124 desc.addFamily(new HColumnDescriptor(FAMILY)); 125 admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32); 126 final long numRows = 1000; 127 128 Configuration conf = TEST_UTIL.getConfiguration(); 129 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100); 130 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME / 10); 131 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); 132 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0); 133 conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, 134 MyPreemptiveFastFailInterceptor.class, 135 PreemptiveFastFailInterceptor.class); 136 137 final Connection connection = ConnectionFactory.createConnection(conf); 138 139 /** 140 * Write numRows worth of data, so that the workers can arbitrarily read. 141 */ 142 List<Put> puts = new ArrayList<>(); 143 for (long i = 0; i < numRows; i++) { 144 byte[] rowKey = longToByteArrayKey(i); 145 Put put = new Put(rowKey); 146 byte[] value = rowKey; // value is the same as the row key 147 put.addColumn(FAMILY, QUALIFIER, value); 148 puts.add(put); 149 } 150 try (Table table = connection.getTable(TableName.valueOf(tableName))) { 151 table.put(puts); 152 LOG.info("Written all puts."); 153 } 154 155 /** 156 * The number of threads that are going to perform actions against the test 157 * table. 158 */ 159 int nThreads = 100; 160 ExecutorService service = Executors.newFixedThreadPool(nThreads); 161 final CountDownLatch continueOtherHalf = new CountDownLatch(1); 162 final CountDownLatch doneHalfway = new CountDownLatch(nThreads); 163 164 final AtomicInteger numSuccessfullThreads = new AtomicInteger(0); 165 final AtomicInteger numFailedThreads = new AtomicInteger(0); 166 167 // The total time taken for the threads to perform the second put; 168 final AtomicLong totalTimeTaken = new AtomicLong(0); 169 final AtomicInteger numBlockedWorkers = new AtomicInteger(0); 170 final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0); 171 172 List<Future<Boolean>> futures = new ArrayList<>(); 173 for (int i = 0; i < nThreads; i++) { 174 futures.add(service.submit(new Callable<Boolean>() { 175 /** 176 * The workers are going to perform a couple of reads. The second read 177 * will follow the killing of a regionserver so that we make sure that 178 * some of threads go into PreemptiveFastFailExcception 179 */ 180 @Override 181 public Boolean call() throws Exception { 182 try (Table table = connection.getTable(TableName.valueOf(tableName))) { 183 Thread.sleep(Math.abs(random.nextInt()) % 250); // Add some jitter here 184 byte[] row = longToByteArrayKey(Math.abs(random.nextLong()) 185 % numRows); 186 Get g = new Get(row); 187 g.addColumn(FAMILY, QUALIFIER); 188 try { 189 table.get(g); 190 } catch (Exception e) { 191 LOG.debug("Get failed : ", e); 192 doneHalfway.countDown(); 193 return false; 194 } 195 196 // Done with one get, proceeding to do the next one. 197 doneHalfway.countDown(); 198 continueOtherHalf.await(); 199 200 long startTime = System.currentTimeMillis(); 201 g = new Get(row); 202 g.addColumn(FAMILY, QUALIFIER); 203 try { 204 table.get(g); 205 // The get was successful 206 numSuccessfullThreads.addAndGet(1); 207 } catch (Exception e) { 208 if (e instanceof PreemptiveFastFailException) { 209 // We were issued a PreemptiveFastFailException 210 numPreemptiveFastFailExceptions.addAndGet(1); 211 } 212 // Irrespective of PFFE, the request failed. 213 numFailedThreads.addAndGet(1); 214 return false; 215 } finally { 216 long enTime = System.currentTimeMillis(); 217 totalTimeTaken.addAndGet(enTime - startTime); 218 if ((enTime - startTime) >= SLEEPTIME) { 219 // Considering the slow workers as the blockedWorkers. 220 // This assumes that the threads go full throttle at performing 221 // actions. In case the thread scheduling itself is as slow as 222 // SLEEPTIME, then this test might fail and so, we might have 223 // set it to a higher number on slower machines. 224 numBlockedWorkers.addAndGet(1); 225 } 226 } 227 return true; 228 } catch (Exception e) { 229 LOG.error("Caught unknown exception", e); 230 doneHalfway.countDown(); 231 return false; 232 } 233 } 234 })); 235 } 236 237 doneHalfway.await(); 238 239 // Kill a regionserver 240 TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().stop(); 241 TEST_UTIL.getHBaseCluster().getRegionServer(0).stop("Testing"); 242 243 // Let the threads continue going 244 continueOtherHalf.countDown(); 245 246 Thread.sleep(2 * SLEEPTIME); 247 // Start a RS in the cluster 248 TEST_UTIL.getHBaseCluster().startRegionServer(); 249 250 int numThreadsReturnedFalse = 0; 251 int numThreadsReturnedTrue = 0; 252 int numThreadsThrewExceptions = 0; 253 for (Future<Boolean> f : futures) { 254 try { 255 numThreadsReturnedTrue += f.get() ? 1 : 0; 256 numThreadsReturnedFalse += f.get() ? 0 : 1; 257 } catch (Exception e) { 258 numThreadsThrewExceptions++; 259 } 260 } 261 LOG.debug("numThreadsReturnedFalse:" 262 + numThreadsReturnedFalse 263 + " numThreadsReturnedTrue:" 264 + numThreadsReturnedTrue 265 + " numThreadsThrewExceptions:" 266 + numThreadsThrewExceptions 267 + " numFailedThreads:" 268 + numFailedThreads.get() 269 + " numSuccessfullThreads:" 270 + numSuccessfullThreads.get() 271 + " numBlockedWorkers:" 272 + numBlockedWorkers.get() 273 + " totalTimeWaited: " 274 + totalTimeTaken.get() 275 / (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers 276 .get()) + " numPFFEs: " + numPreemptiveFastFailExceptions.get()); 277 278 assertEquals("The expected number of all the successfull and the failed " 279 + "threads should equal the total number of threads that we spawned", 280 nThreads, numFailedThreads.get() + numSuccessfullThreads.get()); 281 assertEquals( 282 "All the failures should be coming from the secondput failure", 283 numFailedThreads.get(), numThreadsReturnedFalse); 284 assertEquals("Number of threads that threw execution exceptions " 285 + "otherwise should be 0", 0, numThreadsThrewExceptions); 286 assertEquals("The regionservers that returned true should equal to the" 287 + " number of successful threads", numThreadsReturnedTrue, 288 numSuccessfullThreads.get()); 289 assertTrue( 290 "There will be atleast one thread that retried instead of failing", 291 MyPreemptiveFastFailInterceptor.numBraveSouls.get() > 0); 292 assertTrue( 293 "There will be atleast one PreemptiveFastFail exception," 294 + " otherwise, the test makes little sense." 295 + "numPreemptiveFastFailExceptions: " 296 + numPreemptiveFastFailExceptions.get(), 297 numPreemptiveFastFailExceptions.get() > 0); 298 299 assertTrue( 300 "Only few thread should ideally be waiting for the dead " 301 + "regionserver to be coming back. numBlockedWorkers:" 302 + numBlockedWorkers.get() + " threads that retried : " 303 + MyPreemptiveFastFailInterceptor.numBraveSouls.get(), 304 numBlockedWorkers.get() <= MyPreemptiveFastFailInterceptor.numBraveSouls 305 .get()); 306 } 307 308 @Test 309 public void testCallQueueTooBigExceptionDoesntTriggerPffe() throws Exception { 310 Admin admin = TEST_UTIL.getAdmin(); 311 312 final String tableName = name.getMethodName(); 313 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes 314 .toBytes(tableName))); 315 desc.addFamily(new HColumnDescriptor(FAMILY)); 316 admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 3); 317 318 Configuration conf = TEST_UTIL.getConfiguration(); 319 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100); 320 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 500); 321 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 322 323 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); 324 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0); 325 conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, 326 CallQueueTooBigPffeInterceptor.class, PreemptiveFastFailInterceptor.class); 327 328 final Connection connection = ConnectionFactory.createConnection(conf); 329 330 //Set max call queues size to 0 331 SimpleRpcScheduler srs = (SimpleRpcScheduler) 332 TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().getScheduler(); 333 Configuration newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 334 newConf.setInt("hbase.ipc.server.max.callqueue.length", 0); 335 srs.onConfigurationChange(newConf); 336 337 try (Table table = connection.getTable(TableName.valueOf(tableName))) { 338 Get get = new Get(new byte[1]); 339 table.get(get); 340 } catch (Throwable ex) { 341 } 342 343 assertEquals("We should have not entered PFFE mode on CQTBE, but we did;" 344 + " number of times this mode should have been entered:", 0, 345 CallQueueTooBigPffeInterceptor.numCallQueueTooBig.get()); 346 347 newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 348 newConf.setInt("hbase.ipc.server.max.callqueue.length", 250); 349 srs.onConfigurationChange(newConf); 350 } 351 352 public static class MyPreemptiveFastFailInterceptor extends 353 PreemptiveFastFailInterceptor { 354 public static AtomicInteger numBraveSouls = new AtomicInteger(); 355 356 @Override 357 protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { 358 boolean ret = super.shouldRetryInspiteOfFastFail(fInfo); 359 if (ret) 360 numBraveSouls.addAndGet(1); 361 return ret; 362 } 363 364 public MyPreemptiveFastFailInterceptor(Configuration conf) { 365 super(conf); 366 } 367 } 368 369 private byte[] longToByteArrayKey(long rowKey) { 370 return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes(); 371 } 372 373 public static class CallQueueTooBigPffeInterceptor extends 374 PreemptiveFastFailInterceptor { 375 public static AtomicInteger numCallQueueTooBig = new AtomicInteger(); 376 377 @Override 378 protected void handleFailureToServer(ServerName serverName, Throwable t) { 379 super.handleFailureToServer(serverName, t); 380 numCallQueueTooBig.incrementAndGet(); 381 } 382 383 public CallQueueTooBigPffeInterceptor(Configuration conf) { 384 super(conf); 385 } 386 } 387}