001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicLong;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.coprocessor.ObserverContext;
032import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
033import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
034import org.apache.hadoop.hbase.coprocessor.RegionObserver;
035import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.hadoop.hbase.wal.WALEdit;
040import org.junit.AfterClass;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Rule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.junit.rules.TestName;
047
048/**
049 * This class is for testing HBaseConnectionManager ServerBusyException.
050 * Be careful adding to this class. It sets a low
051 * HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD
052 */
053@Category({LargeTests.class})
054public class TestServerBusyException {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058      HBaseClassTestRule.forClass(TestServerBusyException.class);
059
060  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
061  private static final byte[] FAM_NAM = Bytes.toBytes("f");
062  private static final byte[] ROW = Bytes.toBytes("bbb");
063  private static final int RPC_RETRY = 5;
064
065  @Rule
066  public TestName name = new TestName();
067
068  public static class SleepCoprocessor implements RegionCoprocessor, RegionObserver {
069    public static final int SLEEP_TIME = 5000;
070    @Override
071    public Optional<RegionObserver> getRegionObserver() {
072      return Optional.of(this);
073    }
074
075    @Override
076    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
077        final Get get, final List<Cell> results) throws IOException {
078      Threads.sleep(SLEEP_TIME);
079    }
080
081    @Override
082    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
083        final Put put, final WALEdit edit, final Durability durability) throws IOException {
084      Threads.sleep(SLEEP_TIME);
085    }
086
087    @Override
088    public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
089                               final Increment increment) throws IOException {
090      Threads.sleep(SLEEP_TIME);
091      return null;
092    }
093
094    @Override
095    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete,
096        final WALEdit edit, final Durability durability) throws IOException {
097      Threads.sleep(SLEEP_TIME);
098    }
099
100  }
101
102  public static class SleepLongerAtFirstCoprocessor implements RegionCoprocessor, RegionObserver {
103    public static final int SLEEP_TIME = 2000;
104    static final AtomicLong ct = new AtomicLong(0);
105
106    @Override
107    public Optional<RegionObserver> getRegionObserver() {
108      return Optional.of(this);
109    }
110
111    @Override
112    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
113        final Get get, final List<Cell> results) throws IOException {
114      // After first sleep, all requests are timeout except the last retry. If we handle
115      // all the following requests, finally the last request is also timeout. If we drop all
116      // timeout requests, we can handle the last request immediately and it will not timeout.
117      if (ct.incrementAndGet() <= 1) {
118        Threads.sleep(SLEEP_TIME * RPC_RETRY * 2);
119      } else {
120        Threads.sleep(SLEEP_TIME);
121      }
122    }
123  }
124
125  @BeforeClass
126  public static void setUpBeforeClass() throws Exception {
127    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
128    // Up the handlers; this test needs more than usual.
129    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
130    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
131    // simulate queue blocking in testDropTimeoutRequest
132    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
133    // Needed by the server busy test.
134    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3);
135    TEST_UTIL.startMiniCluster(2);
136  }
137
138  @AfterClass public static void tearDownAfterClass() throws Exception {
139    TEST_UTIL.shutdownMiniCluster();
140  }
141
142  private static class TestPutThread extends Thread {
143    Table table;
144    int getServerBusyException = 0;
145
146    TestPutThread(Table table){
147      this.table = table;
148    }
149
150    @Override
151    public void run() {
152      try {
153        Put p = new Put(ROW);
154        p.addColumn(FAM_NAM, new byte[] { 0 }, new byte[] { 0 });
155        table.put(p);
156      } catch (ServerTooBusyException e) {
157        getServerBusyException = 1;
158      } catch (IOException ignore) {
159      }
160    }
161  }
162
163  private static class TestGetThread extends Thread {
164    Table table;
165    int getServerBusyException = 0;
166
167    TestGetThread(Table table){
168      this.table = table;
169    }
170
171    @Override
172    public void run() {
173      try {
174        Get g = new Get(ROW);
175        g.addColumn(FAM_NAM, new byte[] { 0 });
176        table.get(g);
177      } catch (ServerTooBusyException e) {
178        getServerBusyException = 1;
179      } catch (IOException ignore) {
180      }
181    }
182  }
183
184  @Test()
185  public void testServerBusyException() throws Exception {
186    TableDescriptor hdt = TEST_UTIL.createModifyableTableDescriptor(name.getMethodName())
187      .setCoprocessor(SleepCoprocessor.class.getName()).build();
188    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
189    TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c);
190
191    TestGetThread tg1 =
192        new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
193    TestGetThread tg2 =
194        new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
195    TestGetThread tg3 =
196        new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
197    TestGetThread tg4 =
198        new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
199    TestGetThread tg5 =
200        new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
201    tg1.start();
202    tg2.start();
203    tg3.start();
204    tg4.start();
205    tg5.start();
206    tg1.join();
207    tg2.join();
208    tg3.join();
209    tg4.join();
210    tg5.join();
211    assertEquals(2,
212        tg1.getServerBusyException + tg2.getServerBusyException + tg3.getServerBusyException
213            + tg4.getServerBusyException + tg5.getServerBusyException);
214
215    // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at
216    // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException.
217
218    TestPutThread tp1 =
219        new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
220    TestPutThread tp2 =
221        new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
222    TestPutThread tp3 =
223        new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
224    TestPutThread tp4 =
225        new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
226    TestPutThread tp5 =
227        new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
228    tp1.start();
229    tp2.start();
230    tp3.start();
231    tp4.start();
232    tp5.start();
233    tp1.join();
234    tp2.join();
235    tp3.join();
236    tp4.join();
237    tp5.join();
238    assertEquals(2,
239        tp1.getServerBusyException + tp2.getServerBusyException + tp3.getServerBusyException
240            + tp4.getServerBusyException + tp5.getServerBusyException);
241  }
242}