001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicLong;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.coprocessor.ObserverContext;
032import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
033import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
034import org.apache.hadoop.hbase.coprocessor.RegionObserver;
035import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
036import org.apache.hadoop.hbase.testclassification.LargeTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.hadoop.hbase.wal.WALEdit;
040import org.junit.AfterClass;
041import org.junit.BeforeClass;
042import org.junit.ClassRule;
043import org.junit.Rule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.junit.rules.TestName;
047
048/**
049 * This class is for testing HBaseConnectionManager ServerBusyException. Be careful adding to this
050 * class. It sets a low HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD
051 */
052@Category({ LargeTests.class })
053public class TestServerBusyException {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestServerBusyException.class);
058
059  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
060  private static final byte[] FAM_NAM = Bytes.toBytes("f");
061  private static final byte[] ROW = Bytes.toBytes("bbb");
062  private static final int RPC_RETRY = 5;
063
064  @Rule
065  public TestName name = new TestName();
066
067  public static class SleepCoprocessor implements RegionCoprocessor, RegionObserver {
068    public static final int SLEEP_TIME = 5000;
069
070    @Override
071    public Optional<RegionObserver> getRegionObserver() {
072      return Optional.of(this);
073    }
074
075    @Override
076    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
077      final List<Cell> results) throws IOException {
078      Threads.sleep(SLEEP_TIME);
079    }
080
081    @Override
082    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
083      final WALEdit edit, final Durability durability) throws IOException {
084      Threads.sleep(SLEEP_TIME);
085    }
086
087    @Override
088    public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
089      final Increment increment) throws IOException {
090      Threads.sleep(SLEEP_TIME);
091      return null;
092    }
093
094    @Override
095    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
096      final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
097      Threads.sleep(SLEEP_TIME);
098    }
099
100  }
101
102  public static class SleepLongerAtFirstCoprocessor implements RegionCoprocessor, RegionObserver {
103    public static final int SLEEP_TIME = 2000;
104    static final AtomicLong ct = new AtomicLong(0);
105
106    @Override
107    public Optional<RegionObserver> getRegionObserver() {
108      return Optional.of(this);
109    }
110
111    @Override
112    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get,
113      final List<Cell> results) throws IOException {
114      // After first sleep, all requests are timeout except the last retry. If we handle
115      // all the following requests, finally the last request is also timeout. If we drop all
116      // timeout requests, we can handle the last request immediately and it will not timeout.
117      if (ct.incrementAndGet() <= 1) {
118        Threads.sleep(SLEEP_TIME * RPC_RETRY * 2);
119      } else {
120        Threads.sleep(SLEEP_TIME);
121      }
122    }
123  }
124
125  @BeforeClass
126  public static void setUpBeforeClass() throws Exception {
127    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
128    // Up the handlers; this test needs more than usual.
129    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
130    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
131    // simulate queue blocking in testDropTimeoutRequest
132    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
133    // Needed by the server busy test.
134    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3);
135    TEST_UTIL.startMiniCluster(2);
136  }
137
138  @AfterClass
139  public static void tearDownAfterClass() throws Exception {
140    TEST_UTIL.shutdownMiniCluster();
141  }
142
143  private static class TestPutThread extends Thread {
144    Table table;
145    int getServerBusyException = 0;
146
147    TestPutThread(Table table) {
148      this.table = table;
149    }
150
151    @Override
152    public void run() {
153      try {
154        Put p = new Put(ROW);
155        p.addColumn(FAM_NAM, new byte[] { 0 }, new byte[] { 0 });
156        table.put(p);
157      } catch (ServerTooBusyException e) {
158        getServerBusyException = 1;
159      } catch (IOException ignore) {
160      }
161    }
162  }
163
164  private static class TestGetThread extends Thread {
165    Table table;
166    int getServerBusyException = 0;
167
168    TestGetThread(Table table) {
169      this.table = table;
170    }
171
172    @Override
173    public void run() {
174      try {
175        Get g = new Get(ROW);
176        g.addColumn(FAM_NAM, new byte[] { 0 });
177        table.get(g);
178      } catch (ServerTooBusyException e) {
179        getServerBusyException = 1;
180      } catch (IOException ignore) {
181      }
182    }
183  }
184
185  @Test()
186  public void testServerBusyException() throws Exception {
187    TableDescriptor hdt = TEST_UTIL.createModifyableTableDescriptor(name.getMethodName())
188      .setCoprocessor(SleepCoprocessor.class.getName()).build();
189    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
190    TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c);
191
192    TestGetThread tg1 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
193    TestGetThread tg2 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
194    TestGetThread tg3 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
195    TestGetThread tg4 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
196    TestGetThread tg5 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
197    tg1.start();
198    tg2.start();
199    tg3.start();
200    tg4.start();
201    tg5.start();
202    tg1.join();
203    tg2.join();
204    tg3.join();
205    tg4.join();
206    tg5.join();
207    assertEquals(2, tg1.getServerBusyException + tg2.getServerBusyException
208      + tg3.getServerBusyException + tg4.getServerBusyException + tg5.getServerBusyException);
209
210    // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at
211    // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException.
212
213    TestPutThread tp1 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
214    TestPutThread tp2 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
215    TestPutThread tp3 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
216    TestPutThread tp4 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
217    TestPutThread tp5 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
218    tp1.start();
219    tp2.start();
220    tp3.start();
221    tp4.start();
222    tp5.start();
223    tp1.join();
224    tp2.join();
225    tp3.join();
226    tp4.join();
227    tp5.join();
228    assertEquals(2, tp1.getServerBusyException + tp2.getServerBusyException
229      + tp3.getServerBusyException + tp4.getServerBusyException + tp5.getServerBusyException);
230  }
231}