001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicLong;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.HTableDescriptor;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.coprocessor.ObserverContext;
034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
036import org.apache.hadoop.hbase.coprocessor.RegionObserver;
037import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
038import org.apache.hadoop.hbase.testclassification.LargeTests;
039import org.apache.hadoop.hbase.util.Bytes;
040import org.apache.hadoop.hbase.util.Threads;
041import org.apache.hadoop.hbase.wal.WALEdit;
042import org.junit.AfterClass;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Rule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.junit.rules.TestName;
049
050/**
051 * This class is for testing HBaseConnectionManager ServerBusyException. Be careful adding to this
052 * class. It sets a low HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD
053 */
054@Category({ LargeTests.class })
055public class TestServerBusyException {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestServerBusyException.class);
060
061  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
062  private static final byte[] FAM_NAM = Bytes.toBytes("f");
063  private static final byte[] ROW = Bytes.toBytes("bbb");
064  private static final int RPC_RETRY = 5;
065
066  @Rule
067  public TestName name = new TestName();
068
069  public static class SleepCoprocessor implements RegionCoprocessor, RegionObserver {
070    public static final int SLEEP_TIME = 5000;
071
072    @Override
073    public Optional<RegionObserver> getRegionObserver() {
074      return Optional.of(this);
075    }
076
077    @Override
078    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
079      final List<Cell> results) throws IOException {
080      Threads.sleep(SLEEP_TIME);
081    }
082
083    @Override
084    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
085      final WALEdit edit, final Durability durability) throws IOException {
086      Threads.sleep(SLEEP_TIME);
087    }
088
089    @Override
090    public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
091      final Increment increment) throws IOException {
092      Threads.sleep(SLEEP_TIME);
093      return null;
094    }
095
096    @Override
097    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
098      final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
099      Threads.sleep(SLEEP_TIME);
100    }
101
102  }
103
104  public static class SleepLongerAtFirstCoprocessor implements RegionCoprocessor, RegionObserver {
105    public static final int SLEEP_TIME = 2000;
106    static final AtomicLong ct = new AtomicLong(0);
107
108    @Override
109    public Optional<RegionObserver> getRegionObserver() {
110      return Optional.of(this);
111    }
112
113    @Override
114    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
115      final List<Cell> results) throws IOException {
116      // After first sleep, all requests are timeout except the last retry. If we handle
117      // all the following requests, finally the last request is also timeout. If we drop all
118      // timeout requests, we can handle the last request immediately and it will not timeout.
119      if (ct.incrementAndGet() <= 1) {
120        Threads.sleep(SLEEP_TIME * RPC_RETRY * 2);
121      } else {
122        Threads.sleep(SLEEP_TIME);
123      }
124    }
125  }
126
127  @BeforeClass
128  public static void setUpBeforeClass() throws Exception {
129    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
130    // Up the handlers; this test needs more than usual.
131    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
132    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
133    // simulate queue blocking in testDropTimeoutRequest
134    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
135    // Needed by the server busy test.
136    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3);
137    TEST_UTIL.startMiniCluster(2);
138  }
139
140  @AfterClass
141  public static void tearDownAfterClass() throws Exception {
142    TEST_UTIL.shutdownMiniCluster();
143  }
144
145  private static class TestPutThread extends Thread {
146    Table table;
147    int getServerBusyException = 0;
148
149    TestPutThread(Table table) {
150      this.table = table;
151    }
152
153    @Override
154    public void run() {
155      try {
156        Put p = new Put(ROW);
157        p.addColumn(FAM_NAM, new byte[] { 0 }, new byte[] { 0 });
158        table.put(p);
159      } catch (ServerTooBusyException e) {
160        getServerBusyException = 1;
161      } catch (IOException ignore) {
162      }
163    }
164  }
165
166  private static class TestGetThread extends Thread {
167    Table table;
168    int getServerBusyException = 0;
169
170    TestGetThread(Table table) {
171      this.table = table;
172    }
173
174    @Override
175    public void run() {
176      try {
177        Get g = new Get(ROW);
178        g.addColumn(FAM_NAM, new byte[] { 0 });
179        table.get(g);
180      } catch (ServerTooBusyException e) {
181        getServerBusyException = 1;
182      } catch (IOException ignore) {
183      }
184    }
185  }
186
187  @Test()
188  public void testServerBusyException() throws Exception {
189    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName()));
190    hdt.addCoprocessor(SleepCoprocessor.class.getName());
191    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
192    TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c);
193
194    TestGetThread tg1 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
195    TestGetThread tg2 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
196    TestGetThread tg3 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
197    TestGetThread tg4 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
198    TestGetThread tg5 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
199    tg1.start();
200    tg2.start();
201    tg3.start();
202    tg4.start();
203    tg5.start();
204    tg1.join();
205    tg2.join();
206    tg3.join();
207    tg4.join();
208    tg5.join();
209    assertEquals(2, tg1.getServerBusyException + tg2.getServerBusyException
210      + tg3.getServerBusyException + tg4.getServerBusyException + tg5.getServerBusyException);
211
212    // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at
213    // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException.
214
215    TestPutThread tp1 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
216    TestPutThread tp2 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
217    TestPutThread tp3 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
218    TestPutThread tp4 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
219    TestPutThread tp5 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
220    tp1.start();
221    tp2.start();
222    tp3.start();
223    tp4.start();
224    tp5.start();
225    tp1.join();
226    tp2.join();
227    tp3.join();
228    tp4.join();
229    tp5.join();
230    assertEquals(2, tp1.getServerBusyException + tp2.getServerBusyException
231      + tp3.getServerBusyException + tp4.getServerBusyException + tp5.getServerBusyException);
232  }
233}