001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Random; 027import java.util.concurrent.Callable; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicLong; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HBaseTestingUtility; 038import org.apache.hadoop.hbase.HColumnDescriptor; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.HTableDescriptor; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; 044import org.apache.hadoop.hbase.ipc.RpcExecutor; 045import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler; 046import org.apache.hadoop.hbase.testclassification.ClientTests; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.hadoop.hbase.util.LoadTestKVGenerator; 051import org.junit.After; 052import org.junit.AfterClass; 053import org.junit.Before; 054import org.junit.BeforeClass; 055import org.junit.ClassRule; 056import org.junit.Ignore; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.rules.TestName; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064@Category({ MediumTests.class, ClientTests.class }) 065public class TestFastFail { 066 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestFastFail.class); 070 071 private static final Logger LOG = LoggerFactory.getLogger(TestFastFail.class); 072 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 073 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 074 private static final Random random = new Random(); 075 private static int SLAVES = 1; 076 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 077 private static final int SLEEPTIME = 5000; 078 079 @Rule 080 public TestName name = new TestName(); 081 082 /** 083 * @throws java.lang.Exception 084 */ 085 @BeforeClass 086 public static void setUpBeforeClass() throws Exception { 087 // Just to prevent fastpath FIFO from picking calls up bypassing the queue. 088 TEST_UTIL.getConfiguration().set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, "deadline"); 089 TEST_UTIL.startMiniCluster(SLAVES); 090 } 091 092 /** 093 * @throws java.lang.Exception 094 */ 095 @AfterClass 096 public static void tearDownAfterClass() throws Exception { 097 TEST_UTIL.shutdownMiniCluster(); 098 } 099 100 /** 101 * @throws java.lang.Exception 102 */ 103 @Before 104 public void setUp() throws Exception { 105 MyPreemptiveFastFailInterceptor.numBraveSouls.set(0); 106 CallQueueTooBigPffeInterceptor.numCallQueueTooBig.set(0); 107 } 108 109 /** 110 * @throws java.lang.Exception 111 */ 112 @After 113 public void tearDown() throws Exception { 114 // Nothing to do. 115 } 116 117 @Ignore("Can go zombie -- see HBASE-14421; FIX") 118 @Test 119 public void testFastFail() throws IOException, InterruptedException { 120 Admin admin = TEST_UTIL.getAdmin(); 121 122 final String tableName = name.getMethodName(); 123 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes.toBytes(tableName))); 124 desc.addFamily(new HColumnDescriptor(FAMILY)); 125 admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32); 126 final long numRows = 1000; 127 128 Configuration conf = TEST_UTIL.getConfiguration(); 129 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100); 130 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME / 10); 131 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); 132 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0); 133 conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, 134 MyPreemptiveFastFailInterceptor.class, PreemptiveFastFailInterceptor.class); 135 136 final Connection connection = ConnectionFactory.createConnection(conf); 137 138 /** 139 * Write numRows worth of data, so that the workers can arbitrarily read. 140 */ 141 List<Put> puts = new ArrayList<>(); 142 for (long i = 0; i < numRows; i++) { 143 byte[] rowKey = longToByteArrayKey(i); 144 Put put = new Put(rowKey); 145 byte[] value = rowKey; // value is the same as the row key 146 put.addColumn(FAMILY, QUALIFIER, value); 147 puts.add(put); 148 } 149 try (Table table = connection.getTable(TableName.valueOf(tableName))) { 150 table.put(puts); 151 LOG.info("Written all puts."); 152 } 153 154 /** 155 * The number of threads that are going to perform actions against the test table. 156 */ 157 int nThreads = 100; 158 ExecutorService service = Executors.newFixedThreadPool(nThreads); 159 final CountDownLatch continueOtherHalf = new CountDownLatch(1); 160 final CountDownLatch doneHalfway = new CountDownLatch(nThreads); 161 162 final AtomicInteger numSuccessfullThreads = new AtomicInteger(0); 163 final AtomicInteger numFailedThreads = new AtomicInteger(0); 164 165 // The total time taken for the threads to perform the second put; 166 final AtomicLong totalTimeTaken = new AtomicLong(0); 167 final AtomicInteger numBlockedWorkers = new AtomicInteger(0); 168 final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0); 169 170 List<Future<Boolean>> futures = new ArrayList<>(); 171 for (int i = 0; i < nThreads; i++) { 172 futures.add(service.submit(new Callable<Boolean>() { 173 /** 174 * The workers are going to perform a couple of reads. The second read will follow the 175 * killing of a regionserver so that we make sure that some of threads go into 176 * PreemptiveFastFailExcception 177 */ 178 @Override 179 public Boolean call() throws Exception { 180 try (Table table = connection.getTable(TableName.valueOf(tableName))) { 181 Thread.sleep(Math.abs(random.nextInt()) % 250); // Add some jitter here 182 byte[] row = longToByteArrayKey(Math.abs(random.nextLong()) % numRows); 183 Get g = new Get(row); 184 g.addColumn(FAMILY, QUALIFIER); 185 try { 186 table.get(g); 187 } catch (Exception e) { 188 LOG.debug("Get failed : ", e); 189 doneHalfway.countDown(); 190 return false; 191 } 192 193 // Done with one get, proceeding to do the next one. 194 doneHalfway.countDown(); 195 continueOtherHalf.await(); 196 197 long startTime = EnvironmentEdgeManager.currentTime(); 198 g = new Get(row); 199 g.addColumn(FAMILY, QUALIFIER); 200 try { 201 table.get(g); 202 // The get was successful 203 numSuccessfullThreads.addAndGet(1); 204 } catch (Exception e) { 205 if (e instanceof PreemptiveFastFailException) { 206 // We were issued a PreemptiveFastFailException 207 numPreemptiveFastFailExceptions.addAndGet(1); 208 } 209 // Irrespective of PFFE, the request failed. 210 numFailedThreads.addAndGet(1); 211 return false; 212 } finally { 213 long enTime = EnvironmentEdgeManager.currentTime(); 214 totalTimeTaken.addAndGet(enTime - startTime); 215 if ((enTime - startTime) >= SLEEPTIME) { 216 // Considering the slow workers as the blockedWorkers. 217 // This assumes that the threads go full throttle at performing 218 // actions. In case the thread scheduling itself is as slow as 219 // SLEEPTIME, then this test might fail and so, we might have 220 // set it to a higher number on slower machines. 221 numBlockedWorkers.addAndGet(1); 222 } 223 } 224 return true; 225 } catch (Exception e) { 226 LOG.error("Caught unknown exception", e); 227 doneHalfway.countDown(); 228 return false; 229 } 230 } 231 })); 232 } 233 234 doneHalfway.await(); 235 236 // Kill a regionserver 237 TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().stop(); 238 TEST_UTIL.getHBaseCluster().getRegionServer(0).stop("Testing"); 239 240 // Let the threads continue going 241 continueOtherHalf.countDown(); 242 243 Thread.sleep(2 * SLEEPTIME); 244 // Start a RS in the cluster 245 TEST_UTIL.getHBaseCluster().startRegionServer(); 246 247 int numThreadsReturnedFalse = 0; 248 int numThreadsReturnedTrue = 0; 249 int numThreadsThrewExceptions = 0; 250 for (Future<Boolean> f : futures) { 251 try { 252 numThreadsReturnedTrue += f.get() ? 1 : 0; 253 numThreadsReturnedFalse += f.get() ? 0 : 1; 254 } catch (Exception e) { 255 numThreadsThrewExceptions++; 256 } 257 } 258 LOG.debug("numThreadsReturnedFalse:" + numThreadsReturnedFalse + " numThreadsReturnedTrue:" 259 + numThreadsReturnedTrue + " numThreadsThrewExceptions:" + numThreadsThrewExceptions 260 + " numFailedThreads:" + numFailedThreads.get() + " numSuccessfullThreads:" 261 + numSuccessfullThreads.get() + " numBlockedWorkers:" + numBlockedWorkers.get() 262 + " totalTimeWaited: " 263 + totalTimeTaken.get() 264 / (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers.get()) 265 + " numPFFEs: " + numPreemptiveFastFailExceptions.get()); 266 267 assertEquals( 268 "The expected number of all the successfull and the failed " 269 + "threads should equal the total number of threads that we spawned", 270 nThreads, numFailedThreads.get() + numSuccessfullThreads.get()); 271 assertEquals("All the failures should be coming from the secondput failure", 272 numFailedThreads.get(), numThreadsReturnedFalse); 273 assertEquals("Number of threads that threw execution exceptions " + "otherwise should be 0", 0, 274 numThreadsThrewExceptions); 275 assertEquals( 276 "The regionservers that returned true should equal to the" + " number of successful threads", 277 numThreadsReturnedTrue, numSuccessfullThreads.get()); 278 assertTrue("There will be atleast one thread that retried instead of failing", 279 MyPreemptiveFastFailInterceptor.numBraveSouls.get() > 0); 280 assertTrue("There will be atleast one PreemptiveFastFail exception," 281 + " otherwise, the test makes little sense." + "numPreemptiveFastFailExceptions: " 282 + numPreemptiveFastFailExceptions.get(), numPreemptiveFastFailExceptions.get() > 0); 283 284 assertTrue( 285 "Only few thread should ideally be waiting for the dead " 286 + "regionserver to be coming back. numBlockedWorkers:" + numBlockedWorkers.get() 287 + " threads that retried : " + MyPreemptiveFastFailInterceptor.numBraveSouls.get(), 288 numBlockedWorkers.get() <= MyPreemptiveFastFailInterceptor.numBraveSouls.get()); 289 } 290 291 @Test 292 public void testCallQueueTooBigExceptionDoesntTriggerPffe() throws Exception { 293 Admin admin = TEST_UTIL.getAdmin(); 294 295 final String tableName = name.getMethodName(); 296 HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes.toBytes(tableName))); 297 desc.addFamily(new HColumnDescriptor(FAMILY)); 298 admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 3); 299 300 Configuration conf = TEST_UTIL.getConfiguration(); 301 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100); 302 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 500); 303 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 304 305 conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); 306 conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0); 307 conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL, 308 CallQueueTooBigPffeInterceptor.class, PreemptiveFastFailInterceptor.class); 309 310 final Connection connection = ConnectionFactory.createConnection(conf); 311 312 // Set max call queues size to 0 313 SimpleRpcScheduler srs = (SimpleRpcScheduler) TEST_UTIL.getHBaseCluster().getRegionServer(0) 314 .getRpcServer().getScheduler(); 315 Configuration newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 316 newConf.setInt("hbase.ipc.server.max.callqueue.length", 0); 317 srs.onConfigurationChange(newConf); 318 319 try (Table table = connection.getTable(TableName.valueOf(tableName))) { 320 Get get = new Get(new byte[1]); 321 table.get(get); 322 } catch (Throwable ex) { 323 } 324 325 assertEquals( 326 "We should have not entered PFFE mode on CQTBE, but we did;" 327 + " number of times this mode should have been entered:", 328 0, CallQueueTooBigPffeInterceptor.numCallQueueTooBig.get()); 329 330 newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); 331 newConf.setInt("hbase.ipc.server.max.callqueue.length", 250); 332 srs.onConfigurationChange(newConf); 333 } 334 335 public static class MyPreemptiveFastFailInterceptor extends PreemptiveFastFailInterceptor { 336 public static AtomicInteger numBraveSouls = new AtomicInteger(); 337 338 @Override 339 protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) { 340 boolean ret = super.shouldRetryInspiteOfFastFail(fInfo); 341 if (ret) numBraveSouls.addAndGet(1); 342 return ret; 343 } 344 345 public MyPreemptiveFastFailInterceptor(Configuration conf) { 346 super(conf); 347 } 348 } 349 350 private byte[] longToByteArrayKey(long rowKey) { 351 return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes(); 352 } 353 354 public static class CallQueueTooBigPffeInterceptor extends PreemptiveFastFailInterceptor { 355 public static AtomicInteger numCallQueueTooBig = new AtomicInteger(); 356 357 @Override 358 protected void handleFailureToServer(ServerName serverName, Throwable t) { 359 super.handleFailureToServer(serverName, t); 360 numCallQueueTooBig.incrementAndGet(); 361 } 362 363 public CallQueueTooBigPffeInterceptor(Configuration conf) { 364 super(conf); 365 } 366 } 367}