001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicLong;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.HTableDescriptor;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.coprocessor.ObserverContext;
034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
036import org.apache.hadoop.hbase.coprocessor.RegionObserver;
037import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.Threads;
041import org.apache.hadoop.hbase.wal.WALEdit;
042import org.junit.AfterClass;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Rule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.junit.rules.TestName;
049
050/**
051 * This class is for testing HBaseConnectionManager ServerBusyException.
052 * Be careful adding to this class. It sets a low
053 * HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD
054 */
055@Category({LargeTests.class})
056public class TestServerBusyException {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060      HBaseClassTestRule.forClass(TestServerBusyException.class);
061
062  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
063  private static final byte[] FAM_NAM = Bytes.toBytes("f");
064  private static final byte[] ROW = Bytes.toBytes("bbb");
065  private static final int RPC_RETRY = 5;
066
067  @Rule
068  public TestName name = new TestName();
069
070  public static class SleepCoprocessor implements RegionCoprocessor, RegionObserver {
071    public static final int SLEEP_TIME = 5000;
072    @Override
073    public Optional<RegionObserver> getRegionObserver() {
074      return Optional.of(this);
075    }
076
077    @Override
078    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
079        final Get get, final List<Cell> results) throws IOException {
080      Threads.sleep(SLEEP_TIME);
081    }
082
083    @Override
084    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
085        final Put put, final WALEdit edit, final Durability durability) throws IOException {
086      Threads.sleep(SLEEP_TIME);
087    }
088
089    @Override
090    public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
091                               final Increment increment) throws IOException {
092      Threads.sleep(SLEEP_TIME);
093      return null;
094    }
095
096    @Override
097    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete,
098        final WALEdit edit, final Durability durability) throws IOException {
099      Threads.sleep(SLEEP_TIME);
100    }
101
102  }
103
104  public static class SleepLongerAtFirstCoprocessor implements RegionCoprocessor, RegionObserver {
105    public static final int SLEEP_TIME = 2000;
106    static final AtomicLong ct = new AtomicLong(0);
107
108    @Override
109    public Optional<RegionObserver> getRegionObserver() {
110      return Optional.of(this);
111    }
112
113    @Override
114    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
115        final Get get, final List<Cell> results) throws IOException {
116      // After first sleep, all requests are timeout except the last retry. If we handle
117      // all the following requests, finally the last request is also timeout. If we drop all
118      // timeout requests, we can handle the last request immediately and it will not timeout.
119      if (ct.incrementAndGet() <= 1) {
120        Threads.sleep(SLEEP_TIME * RPC_RETRY * 2);
121      } else {
122        Threads.sleep(SLEEP_TIME);
123      }
124    }
125  }
126
127  @BeforeClass
128  public static void setUpBeforeClass() throws Exception {
129    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
130    // Up the handlers; this test needs more than usual.
131    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
132    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
133    // simulate queue blocking in testDropTimeoutRequest
134    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
135    // Needed by the server busy test.
136    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3);
137    TEST_UTIL.startMiniCluster(2);
138  }
139
140  @AfterClass public static void tearDownAfterClass() throws Exception {
141    TEST_UTIL.shutdownMiniCluster();
142  }
143
144  private static class TestPutThread extends Thread {
145    Table table;
146    int getServerBusyException = 0;
147
148    TestPutThread(Table table){
149      this.table = table;
150    }
151
152    @Override
153    public void run() {
154      try {
155        Put p = new Put(ROW);
156        p.addColumn(FAM_NAM, new byte[] { 0 }, new byte[] { 0 });
157        table.put(p);
158      } catch (ServerTooBusyException e) {
159        getServerBusyException = 1;
160      } catch (IOException ignore) {
161      }
162    }
163  }
164
165  private static class TestGetThread extends Thread {
166    Table table;
167    int getServerBusyException = 0;
168
169    TestGetThread(Table table){
170      this.table = table;
171    }
172
173    @Override
174    public void run() {
175      try {
176        Get g = new Get(ROW);
177        g.addColumn(FAM_NAM, new byte[] { 0 });
178        table.get(g);
179      } catch (ServerTooBusyException e) {
180        getServerBusyException = 1;
181      } catch (IOException ignore) {
182      }
183    }
184  }
185
186  @Test()
187  public void testServerBusyException() throws Exception {
188    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()));
189    hdt.addCoprocessor(SleepCoprocessor.class.getName());
190    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
191    TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c);
192
193    TestGetThread tg1 =
194        new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
195    TestGetThread tg2 =
196        new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
197    TestGetThread tg3 =
198        new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
199    TestGetThread tg4 =
200        new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
201    TestGetThread tg5 =
202        new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
203    tg1.start();
204    tg2.start();
205    tg3.start();
206    tg4.start();
207    tg5.start();
208    tg1.join();
209    tg2.join();
210    tg3.join();
211    tg4.join();
212    tg5.join();
213    assertEquals(2,
214        tg1.getServerBusyException + tg2.getServerBusyException + tg3.getServerBusyException
215            + tg4.getServerBusyException + tg5.getServerBusyException);
216
217    // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at
218    // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException.
219
220    TestPutThread tp1 =
221        new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
222    TestPutThread tp2 =
223        new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
224    TestPutThread tp3 =
225        new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
226    TestPutThread tp4 =
227        new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
228    TestPutThread tp5 =
229        new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
230    tp1.start();
231    tp2.start();
232    tp3.start();
233    tp4.start();
234    tp5.start();
235    tp1.join();
236    tp2.join();
237    tp3.join();
238    tp4.join();
239    tp5.join();
240    assertEquals(2,
241        tp1.getServerBusyException + tp2.getServerBusyException + tp3.getServerBusyException
242            + tp4.getServerBusyException + tp5.getServerBusyException);
243  }
244}