001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Random;
027import java.util.concurrent.Callable;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HColumnDescriptor;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HTableDescriptor;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
044import org.apache.hadoop.hbase.ipc.RpcExecutor;
045import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
046import org.apache.hadoop.hbase.testclassification.ClientTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
051import org.junit.After;
052import org.junit.AfterClass;
053import org.junit.Before;
054import org.junit.BeforeClass;
055import org.junit.ClassRule;
056import org.junit.Ignore;
057import org.junit.Rule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.rules.TestName;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064@Category({ MediumTests.class, ClientTests.class })
065public class TestFastFail {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestFastFail.class);
070
071  private static final Logger LOG = LoggerFactory.getLogger(TestFastFail.class);
072  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
073  private static byte[] FAMILY = Bytes.toBytes("testFamily");
074  private static final Random random = new Random();
075  private static int SLAVES = 1;
076  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
077  private static final int SLEEPTIME = 5000;
078
079  @Rule
080  public TestName name = new TestName();
081
082  /**
083   * @throws java.lang.Exception
084   */
085  @BeforeClass
086  public static void setUpBeforeClass() throws Exception {
087    // Just to prevent fastpath FIFO from picking calls up bypassing the queue.
088    TEST_UTIL.getConfiguration().set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, "deadline");
089    TEST_UTIL.startMiniCluster(SLAVES);
090  }
091
092  /**
093   * @throws java.lang.Exception
094   */
095  @AfterClass
096  public static void tearDownAfterClass() throws Exception {
097    TEST_UTIL.shutdownMiniCluster();
098  }
099
100  /**
101   * @throws java.lang.Exception
102   */
103  @Before
104  public void setUp() throws Exception {
105    MyPreemptiveFastFailInterceptor.numBraveSouls.set(0);
106    CallQueueTooBigPffeInterceptor.numCallQueueTooBig.set(0);
107  }
108
109  /**
110   * @throws java.lang.Exception
111   */
112  @After
113  public void tearDown() throws Exception {
114    // Nothing to do.
115  }
116
117  @Ignore("Can go zombie -- see HBASE-14421; FIX")
118  @Test
119  public void testFastFail() throws IOException, InterruptedException {
120    Admin admin = TEST_UTIL.getAdmin();
121
122    final String tableName = name.getMethodName();
123    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes.toBytes(tableName)));
124    desc.addFamily(new HColumnDescriptor(FAMILY));
125    admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32);
126    final long numRows = 1000;
127
128    Configuration conf = TEST_UTIL.getConfiguration();
129    conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100);
130    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME / 10);
131    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
132    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0);
133    conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
134      MyPreemptiveFastFailInterceptor.class, PreemptiveFastFailInterceptor.class);
135
136    final Connection connection = ConnectionFactory.createConnection(conf);
137
138    /**
139     * Write numRows worth of data, so that the workers can arbitrarily read.
140     */
141    List<Put> puts = new ArrayList<>();
142    for (long i = 0; i < numRows; i++) {
143      byte[] rowKey = longToByteArrayKey(i);
144      Put put = new Put(rowKey);
145      byte[] value = rowKey; // value is the same as the row key
146      put.addColumn(FAMILY, QUALIFIER, value);
147      puts.add(put);
148    }
149    try (Table table = connection.getTable(TableName.valueOf(tableName))) {
150      table.put(puts);
151      LOG.info("Written all puts.");
152    }
153
154    /**
155     * The number of threads that are going to perform actions against the test table.
156     */
157    int nThreads = 100;
158    ExecutorService service = Executors.newFixedThreadPool(nThreads);
159    final CountDownLatch continueOtherHalf = new CountDownLatch(1);
160    final CountDownLatch doneHalfway = new CountDownLatch(nThreads);
161
162    final AtomicInteger numSuccessfullThreads = new AtomicInteger(0);
163    final AtomicInteger numFailedThreads = new AtomicInteger(0);
164
165    // The total time taken for the threads to perform the second put;
166    final AtomicLong totalTimeTaken = new AtomicLong(0);
167    final AtomicInteger numBlockedWorkers = new AtomicInteger(0);
168    final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0);
169
170    List<Future<Boolean>> futures = new ArrayList<>();
171    for (int i = 0; i < nThreads; i++) {
172      futures.add(service.submit(new Callable<Boolean>() {
173        /**
174         * The workers are going to perform a couple of reads. The second read will follow the
175         * killing of a regionserver so that we make sure that some of threads go into
176         * PreemptiveFastFailExcception
177         */
178        @Override
179        public Boolean call() throws Exception {
180          try (Table table = connection.getTable(TableName.valueOf(tableName))) {
181            Thread.sleep(Math.abs(random.nextInt()) % 250); // Add some jitter here
182            byte[] row = longToByteArrayKey(Math.abs(random.nextLong()) % numRows);
183            Get g = new Get(row);
184            g.addColumn(FAMILY, QUALIFIER);
185            try {
186              table.get(g);
187            } catch (Exception e) {
188              LOG.debug("Get failed : ", e);
189              doneHalfway.countDown();
190              return false;
191            }
192
193            // Done with one get, proceeding to do the next one.
194            doneHalfway.countDown();
195            continueOtherHalf.await();
196
197            long startTime = EnvironmentEdgeManager.currentTime();
198            g = new Get(row);
199            g.addColumn(FAMILY, QUALIFIER);
200            try {
201              table.get(g);
202              // The get was successful
203              numSuccessfullThreads.addAndGet(1);
204            } catch (Exception e) {
205              if (e instanceof PreemptiveFastFailException) {
206                // We were issued a PreemptiveFastFailException
207                numPreemptiveFastFailExceptions.addAndGet(1);
208              }
209              // Irrespective of PFFE, the request failed.
210              numFailedThreads.addAndGet(1);
211              return false;
212            } finally {
213              long enTime = EnvironmentEdgeManager.currentTime();
214              totalTimeTaken.addAndGet(enTime - startTime);
215              if ((enTime - startTime) >= SLEEPTIME) {
216                // Considering the slow workers as the blockedWorkers.
217                // This assumes that the threads go full throttle at performing
218                // actions. In case the thread scheduling itself is as slow as
219                // SLEEPTIME, then this test might fail and so, we might have
220                // set it to a higher number on slower machines.
221                numBlockedWorkers.addAndGet(1);
222              }
223            }
224            return true;
225          } catch (Exception e) {
226            LOG.error("Caught unknown exception", e);
227            doneHalfway.countDown();
228            return false;
229          }
230        }
231      }));
232    }
233
234    doneHalfway.await();
235
236    // Kill a regionserver
237    TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().stop();
238    TEST_UTIL.getHBaseCluster().getRegionServer(0).stop("Testing");
239
240    // Let the threads continue going
241    continueOtherHalf.countDown();
242
243    Thread.sleep(2 * SLEEPTIME);
244    // Start a RS in the cluster
245    TEST_UTIL.getHBaseCluster().startRegionServer();
246
247    int numThreadsReturnedFalse = 0;
248    int numThreadsReturnedTrue = 0;
249    int numThreadsThrewExceptions = 0;
250    for (Future<Boolean> f : futures) {
251      try {
252        numThreadsReturnedTrue += f.get() ? 1 : 0;
253        numThreadsReturnedFalse += f.get() ? 0 : 1;
254      } catch (Exception e) {
255        numThreadsThrewExceptions++;
256      }
257    }
258    LOG.debug("numThreadsReturnedFalse:" + numThreadsReturnedFalse + " numThreadsReturnedTrue:"
259      + numThreadsReturnedTrue + " numThreadsThrewExceptions:" + numThreadsThrewExceptions
260      + " numFailedThreads:" + numFailedThreads.get() + " numSuccessfullThreads:"
261      + numSuccessfullThreads.get() + " numBlockedWorkers:" + numBlockedWorkers.get()
262      + " totalTimeWaited: "
263      + totalTimeTaken.get()
264        / (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers.get())
265      + " numPFFEs: " + numPreemptiveFastFailExceptions.get());
266
267    assertEquals(
268      "The expected number of all the successfull and the failed "
269        + "threads should equal the total number of threads that we spawned",
270      nThreads, numFailedThreads.get() + numSuccessfullThreads.get());
271    assertEquals("All the failures should be coming from the secondput failure",
272      numFailedThreads.get(), numThreadsReturnedFalse);
273    assertEquals("Number of threads that threw execution exceptions " + "otherwise should be 0", 0,
274      numThreadsThrewExceptions);
275    assertEquals(
276      "The regionservers that returned true should equal to the" + " number of successful threads",
277      numThreadsReturnedTrue, numSuccessfullThreads.get());
278    assertTrue("There will be atleast one thread that retried instead of failing",
279      MyPreemptiveFastFailInterceptor.numBraveSouls.get() > 0);
280    assertTrue("There will be atleast one PreemptiveFastFail exception,"
281      + " otherwise, the test makes little sense." + "numPreemptiveFastFailExceptions: "
282      + numPreemptiveFastFailExceptions.get(), numPreemptiveFastFailExceptions.get() > 0);
283
284    assertTrue(
285      "Only few thread should ideally be waiting for the dead "
286        + "regionserver to be coming back. numBlockedWorkers:" + numBlockedWorkers.get()
287        + " threads that retried : " + MyPreemptiveFastFailInterceptor.numBraveSouls.get(),
288      numBlockedWorkers.get() <= MyPreemptiveFastFailInterceptor.numBraveSouls.get());
289  }
290
291  @Test
292  public void testCallQueueTooBigExceptionDoesntTriggerPffe() throws Exception {
293    Admin admin = TEST_UTIL.getAdmin();
294
295    final String tableName = name.getMethodName();
296    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes.toBytes(tableName)));
297    desc.addFamily(new HColumnDescriptor(FAMILY));
298    admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 3);
299
300    Configuration conf = TEST_UTIL.getConfiguration();
301    conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100);
302    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 500);
303    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
304
305    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
306    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0);
307    conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
308      CallQueueTooBigPffeInterceptor.class, PreemptiveFastFailInterceptor.class);
309
310    final Connection connection = ConnectionFactory.createConnection(conf);
311
312    // Set max call queues size to 0
313    SimpleRpcScheduler srs = (SimpleRpcScheduler) TEST_UTIL.getHBaseCluster().getRegionServer(0)
314      .getRpcServer().getScheduler();
315    Configuration newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
316    newConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
317    srs.onConfigurationChange(newConf);
318
319    try (Table table = connection.getTable(TableName.valueOf(tableName))) {
320      Get get = new Get(new byte[1]);
321      table.get(get);
322    } catch (Throwable ex) {
323    }
324
325    assertEquals(
326      "We should have not entered PFFE mode on CQTBE, but we did;"
327        + " number of times this mode should have been entered:",
328      0, CallQueueTooBigPffeInterceptor.numCallQueueTooBig.get());
329
330    newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
331    newConf.setInt("hbase.ipc.server.max.callqueue.length", 250);
332    srs.onConfigurationChange(newConf);
333  }
334
335  public static class MyPreemptiveFastFailInterceptor extends PreemptiveFastFailInterceptor {
336    public static AtomicInteger numBraveSouls = new AtomicInteger();
337
338    @Override
339    protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
340      boolean ret = super.shouldRetryInspiteOfFastFail(fInfo);
341      if (ret) numBraveSouls.addAndGet(1);
342      return ret;
343    }
344
345    public MyPreemptiveFastFailInterceptor(Configuration conf) {
346      super(conf);
347    }
348  }
349
350  private byte[] longToByteArrayKey(long rowKey) {
351    return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
352  }
353
354  public static class CallQueueTooBigPffeInterceptor extends PreemptiveFastFailInterceptor {
355    public static AtomicInteger numCallQueueTooBig = new AtomicInteger();
356
357    @Override
358    protected void handleFailureToServer(ServerName serverName, Throwable t) {
359      super.handleFailureToServer(serverName, t);
360      numCallQueueTooBig.incrementAndGet();
361    }
362
363    public CallQueueTooBigPffeInterceptor(Configuration conf) {
364      super(conf);
365    }
366  }
367}