001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Random;
027import java.util.concurrent.Callable;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HBaseTestingUtility;
038import org.apache.hadoop.hbase.HColumnDescriptor;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.HTableDescriptor;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
044import org.apache.hadoop.hbase.ipc.RpcExecutor;
045import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
046import org.apache.hadoop.hbase.testclassification.ClientTests;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.apache.hadoop.hbase.util.LoadTestKVGenerator;
050import org.junit.After;
051import org.junit.AfterClass;
052import org.junit.Before;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Ignore;
056import org.junit.Rule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.junit.rules.TestName;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063@Category({MediumTests.class, ClientTests.class})
064public class TestFastFail {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068      HBaseClassTestRule.forClass(TestFastFail.class);
069
070  private static final Logger LOG = LoggerFactory.getLogger(TestFastFail.class);
071  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
072  private static byte[] FAMILY = Bytes.toBytes("testFamily");
073  private static final Random random = new Random();
074  private static int SLAVES = 1;
075  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
076  private static final int SLEEPTIME = 5000;
077
078  @Rule
079  public TestName name = new TestName();
080
081  /**
082   * @throws java.lang.Exception
083   */
084  @BeforeClass
085  public static void setUpBeforeClass() throws Exception {
086    // Just to prevent fastpath FIFO from picking calls up bypassing the queue.
087    TEST_UTIL.getConfiguration().set(
088      RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, "deadline");
089    TEST_UTIL.startMiniCluster(SLAVES);
090  }
091
092  /**
093   * @throws java.lang.Exception
094   */
095  @AfterClass
096  public static void tearDownAfterClass() throws Exception {
097    TEST_UTIL.shutdownMiniCluster();
098  }
099
100  /**
101   * @throws java.lang.Exception
102   */
103  @Before
104  public void setUp() throws Exception {
105    MyPreemptiveFastFailInterceptor.numBraveSouls.set(0);
106    CallQueueTooBigPffeInterceptor.numCallQueueTooBig.set(0);
107  }
108
109  /**
110   * @throws java.lang.Exception
111   */
112  @After
113  public void tearDown() throws Exception {
114    // Nothing to do.
115  }
116
117  @Ignore ("Can go zombie -- see HBASE-14421; FIX") @Test
118  public void testFastFail() throws IOException, InterruptedException {
119    Admin admin = TEST_UTIL.getAdmin();
120
121    final String tableName = name.getMethodName();
122    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes
123        .toBytes(tableName)));
124    desc.addFamily(new HColumnDescriptor(FAMILY));
125    admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32);
126    final long numRows = 1000;
127
128    Configuration conf = TEST_UTIL.getConfiguration();
129    conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100);
130    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME / 10);
131    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
132    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0);
133    conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
134        MyPreemptiveFastFailInterceptor.class,
135        PreemptiveFastFailInterceptor.class);
136
137    final Connection connection = ConnectionFactory.createConnection(conf);
138
139    /**
140     * Write numRows worth of data, so that the workers can arbitrarily read.
141     */
142    List<Put> puts = new ArrayList<>();
143    for (long i = 0; i < numRows; i++) {
144      byte[] rowKey = longToByteArrayKey(i);
145      Put put = new Put(rowKey);
146      byte[] value = rowKey; // value is the same as the row key
147      put.addColumn(FAMILY, QUALIFIER, value);
148      puts.add(put);
149    }
150    try (Table table = connection.getTable(TableName.valueOf(tableName))) {
151      table.put(puts);
152      LOG.info("Written all puts.");
153    }
154
155    /**
156     * The number of threads that are going to perform actions against the test
157     * table.
158     */
159    int nThreads = 100;
160    ExecutorService service = Executors.newFixedThreadPool(nThreads);
161    final CountDownLatch continueOtherHalf = new CountDownLatch(1);
162    final CountDownLatch doneHalfway = new CountDownLatch(nThreads);
163
164    final AtomicInteger numSuccessfullThreads = new AtomicInteger(0);
165    final AtomicInteger numFailedThreads = new AtomicInteger(0);
166
167    // The total time taken for the threads to perform the second put;
168    final AtomicLong totalTimeTaken = new AtomicLong(0);
169    final AtomicInteger numBlockedWorkers = new AtomicInteger(0);
170    final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0);
171
172    List<Future<Boolean>> futures = new ArrayList<>();
173    for (int i = 0; i < nThreads; i++) {
174      futures.add(service.submit(new Callable<Boolean>() {
175        /**
176         * The workers are going to perform a couple of reads. The second read
177         * will follow the killing of a regionserver so that we make sure that
178         * some of threads go into PreemptiveFastFailExcception
179         */
180        @Override
181        public Boolean call() throws Exception {
182          try (Table table = connection.getTable(TableName.valueOf(tableName))) {
183            Thread.sleep(Math.abs(random.nextInt()) % 250); // Add some jitter here
184            byte[] row = longToByteArrayKey(Math.abs(random.nextLong())
185                % numRows);
186            Get g = new Get(row);
187            g.addColumn(FAMILY, QUALIFIER);
188            try {
189              table.get(g);
190            } catch (Exception e) {
191              LOG.debug("Get failed : ", e);
192              doneHalfway.countDown();
193              return false;
194            }
195
196            // Done with one get, proceeding to do the next one.
197            doneHalfway.countDown();
198            continueOtherHalf.await();
199
200            long startTime = System.currentTimeMillis();
201            g = new Get(row);
202            g.addColumn(FAMILY, QUALIFIER);
203            try {
204              table.get(g);
205              // The get was successful
206              numSuccessfullThreads.addAndGet(1);
207            } catch (Exception e) {
208              if (e instanceof PreemptiveFastFailException) {
209                // We were issued a PreemptiveFastFailException
210                numPreemptiveFastFailExceptions.addAndGet(1);
211              }
212              // Irrespective of PFFE, the request failed.
213              numFailedThreads.addAndGet(1);
214              return false;
215            } finally {
216              long enTime = System.currentTimeMillis();
217              totalTimeTaken.addAndGet(enTime - startTime);
218              if ((enTime - startTime) >= SLEEPTIME) {
219                // Considering the slow workers as the blockedWorkers.
220                // This assumes that the threads go full throttle at performing
221                // actions. In case the thread scheduling itself is as slow as
222                // SLEEPTIME, then this test might fail and so, we might have
223                // set it to a higher number on slower machines.
224                numBlockedWorkers.addAndGet(1);
225              }
226            }
227            return true;
228          } catch (Exception e) {
229            LOG.error("Caught unknown exception", e);
230            doneHalfway.countDown();
231            return false;
232          }
233        }
234      }));
235    }
236
237    doneHalfway.await();
238
239    // Kill a regionserver
240    TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().stop();
241    TEST_UTIL.getHBaseCluster().getRegionServer(0).stop("Testing");
242
243    // Let the threads continue going
244    continueOtherHalf.countDown();
245
246    Thread.sleep(2 * SLEEPTIME);
247    // Start a RS in the cluster
248    TEST_UTIL.getHBaseCluster().startRegionServer();
249
250    int numThreadsReturnedFalse = 0;
251    int numThreadsReturnedTrue = 0;
252    int numThreadsThrewExceptions = 0;
253    for (Future<Boolean> f : futures) {
254      try {
255        numThreadsReturnedTrue += f.get() ? 1 : 0;
256        numThreadsReturnedFalse += f.get() ? 0 : 1;
257      } catch (Exception e) {
258        numThreadsThrewExceptions++;
259      }
260    }
261    LOG.debug("numThreadsReturnedFalse:"
262        + numThreadsReturnedFalse
263        + " numThreadsReturnedTrue:"
264        + numThreadsReturnedTrue
265        + " numThreadsThrewExceptions:"
266        + numThreadsThrewExceptions
267        + " numFailedThreads:"
268        + numFailedThreads.get()
269        + " numSuccessfullThreads:"
270        + numSuccessfullThreads.get()
271        + " numBlockedWorkers:"
272        + numBlockedWorkers.get()
273        + " totalTimeWaited: "
274        + totalTimeTaken.get()
275        / (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers
276            .get()) + " numPFFEs: " + numPreemptiveFastFailExceptions.get());
277
278    assertEquals("The expected number of all the successfull and the failed "
279        + "threads should equal the total number of threads that we spawned",
280        nThreads, numFailedThreads.get() + numSuccessfullThreads.get());
281    assertEquals(
282        "All the failures should be coming from the secondput failure",
283        numFailedThreads.get(), numThreadsReturnedFalse);
284    assertEquals("Number of threads that threw execution exceptions "
285        + "otherwise should be 0", 0, numThreadsThrewExceptions);
286    assertEquals("The regionservers that returned true should equal to the"
287        + " number of successful threads", numThreadsReturnedTrue,
288        numSuccessfullThreads.get());
289    assertTrue(
290        "There will be atleast one thread that retried instead of failing",
291        MyPreemptiveFastFailInterceptor.numBraveSouls.get() > 0);
292    assertTrue(
293        "There will be atleast one PreemptiveFastFail exception,"
294            + " otherwise, the test makes little sense."
295            + "numPreemptiveFastFailExceptions: "
296            + numPreemptiveFastFailExceptions.get(),
297        numPreemptiveFastFailExceptions.get() > 0);
298
299    assertTrue(
300        "Only few thread should ideally be waiting for the dead "
301            + "regionserver to be coming back. numBlockedWorkers:"
302            + numBlockedWorkers.get() + " threads that retried : "
303            + MyPreemptiveFastFailInterceptor.numBraveSouls.get(),
304        numBlockedWorkers.get() <= MyPreemptiveFastFailInterceptor.numBraveSouls
305            .get());
306  }
307
308  @Test
309  public void testCallQueueTooBigExceptionDoesntTriggerPffe() throws Exception {
310    Admin admin = TEST_UTIL.getAdmin();
311
312    final String tableName = name.getMethodName();
313    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes
314      .toBytes(tableName)));
315    desc.addFamily(new HColumnDescriptor(FAMILY));
316    admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 3);
317
318    Configuration conf = TEST_UTIL.getConfiguration();
319    conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 100);
320    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 500);
321    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
322
323    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
324    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0);
325    conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
326      CallQueueTooBigPffeInterceptor.class, PreemptiveFastFailInterceptor.class);
327
328    final Connection connection = ConnectionFactory.createConnection(conf);
329
330    //Set max call queues size to 0
331    SimpleRpcScheduler srs = (SimpleRpcScheduler)
332      TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().getScheduler();
333    Configuration newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
334    newConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
335    srs.onConfigurationChange(newConf);
336
337    try (Table table = connection.getTable(TableName.valueOf(tableName))) {
338      Get get = new Get(new byte[1]);
339      table.get(get);
340    } catch (Throwable ex) {
341    }
342
343    assertEquals("We should have not entered PFFE mode on CQTBE, but we did;"
344      + " number of times this mode should have been entered:", 0,
345      CallQueueTooBigPffeInterceptor.numCallQueueTooBig.get());
346
347    newConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
348    newConf.setInt("hbase.ipc.server.max.callqueue.length", 250);
349    srs.onConfigurationChange(newConf);
350  }
351
352  public static class MyPreemptiveFastFailInterceptor extends
353      PreemptiveFastFailInterceptor {
354    public static AtomicInteger numBraveSouls = new AtomicInteger();
355
356    @Override
357    protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
358      boolean ret = super.shouldRetryInspiteOfFastFail(fInfo);
359      if (ret)
360        numBraveSouls.addAndGet(1);
361      return ret;
362    }
363
364    public MyPreemptiveFastFailInterceptor(Configuration conf) {
365      super(conf);
366    }
367  }
368
369  private byte[] longToByteArrayKey(long rowKey) {
370    return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
371  }
372
373  public static class CallQueueTooBigPffeInterceptor extends
374    PreemptiveFastFailInterceptor {
375    public static AtomicInteger numCallQueueTooBig = new AtomicInteger();
376
377    @Override
378    protected void handleFailureToServer(ServerName serverName, Throwable t) {
379      super.handleFailureToServer(serverName, t);
380      numCallQueueTooBig.incrementAndGet();
381    }
382
383    public CallQueueTooBigPffeInterceptor(Configuration conf) {
384      super(conf);
385    }
386  }
387}