001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicLong;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.coprocessor.ObserverContext;
031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
032import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
033import org.apache.hadoop.hbase.coprocessor.RegionObserver;
034import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
035import org.apache.hadoop.hbase.testclassification.LargeTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.hadoop.hbase.wal.WALEdit;
039import org.junit.jupiter.api.AfterAll;
040import org.junit.jupiter.api.BeforeAll;
041import org.junit.jupiter.api.Tag;
042import org.junit.jupiter.api.Test;
043import org.junit.jupiter.api.TestInfo;
044
045/**
046 * This class is for testing HBaseConnectionManager ServerBusyException. Be careful adding to this
047 * class. It sets a low HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD
048 */
049@Tag(LargeTests.TAG)
050public class TestServerBusyException {
051
052  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
053  private static final byte[] FAM_NAM = Bytes.toBytes("f");
054  private static final byte[] ROW = Bytes.toBytes("bbb");
055  private static final int RPC_RETRY = 5;
056
057  public static class SleepCoprocessor implements RegionCoprocessor, RegionObserver {
058    public static final int SLEEP_TIME = 5000;
059
060    @Override
061    public Optional<RegionObserver> getRegionObserver() {
062      return Optional.of(this);
063    }
064
065    @Override
066    public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
067      final Get get, final List<Cell> results) throws IOException {
068      Threads.sleep(SLEEP_TIME);
069    }
070
071    @Override
072    public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
073      final Put put, final WALEdit edit, final Durability durability) throws IOException {
074      Threads.sleep(SLEEP_TIME);
075    }
076
077    @Override
078    public Result preIncrement(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
079      final Increment increment) throws IOException {
080      Threads.sleep(SLEEP_TIME);
081      return null;
082    }
083
084    @Override
085    public void preDelete(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
086      final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
087      Threads.sleep(SLEEP_TIME);
088    }
089
090  }
091
092  public static class SleepLongerAtFirstCoprocessor implements RegionCoprocessor, RegionObserver {
093    public static final int SLEEP_TIME = 2000;
094    static final AtomicLong ct = new AtomicLong(0);
095
096    @Override
097    public Optional<RegionObserver> getRegionObserver() {
098      return Optional.of(this);
099    }
100
101    @Override
102    public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
103      final Get get, final List<Cell> results) throws IOException {
104      // After first sleep, all requests are timeout except the last retry. If we handle
105      // all the following requests, finally the last request is also timeout. If we drop all
106      // timeout requests, we can handle the last request immediately and it will not timeout.
107      if (ct.incrementAndGet() <= 1) {
108        Threads.sleep(SLEEP_TIME * RPC_RETRY * 2);
109      } else {
110        Threads.sleep(SLEEP_TIME);
111      }
112    }
113  }
114
115  @BeforeAll
116  public static void setUpBeforeClass() throws Exception {
117    TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
118    // Up the handlers; this test needs more than usual.
119    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
120    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
121    // simulate queue blocking in testDropTimeoutRequest
122    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
123    // Needed by the server busy test.
124    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3);
125    TEST_UTIL.startMiniCluster(2);
126  }
127
128  @AfterAll
129  public static void tearDownAfterClass() throws Exception {
130    TEST_UTIL.shutdownMiniCluster();
131  }
132
133  private static class TestPutThread extends Thread {
134    Table table;
135    int getServerBusyException = 0;
136
137    TestPutThread(Table table) {
138      this.table = table;
139    }
140
141    @Override
142    public void run() {
143      try {
144        Put p = new Put(ROW);
145        p.addColumn(FAM_NAM, new byte[] { 0 }, new byte[] { 0 });
146        table.put(p);
147      } catch (ServerTooBusyException e) {
148        getServerBusyException = 1;
149      } catch (IOException ignore) {
150      }
151    }
152  }
153
154  private static class TestGetThread extends Thread {
155    Table table;
156    int getServerBusyException = 0;
157
158    TestGetThread(Table table) {
159      this.table = table;
160    }
161
162    @Override
163    public void run() {
164      try {
165        Get g = new Get(ROW);
166        g.addColumn(FAM_NAM, new byte[] { 0 });
167        table.get(g);
168      } catch (ServerTooBusyException e) {
169        getServerBusyException = 1;
170      } catch (IOException ignore) {
171      }
172    }
173  }
174
175  @Test()
176  public void testServerBusyException(TestInfo testInfo) throws Exception {
177    TableDescriptor hdt =
178      TEST_UTIL.createModifyableTableDescriptor(testInfo.getTestMethod().get().getName())
179        .setCoprocessor(SleepCoprocessor.class.getName()).build();
180    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
181    TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c);
182
183    TestGetThread tg1 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
184    TestGetThread tg2 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
185    TestGetThread tg3 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
186    TestGetThread tg4 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
187    TestGetThread tg5 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
188    tg1.start();
189    tg2.start();
190    tg3.start();
191    tg4.start();
192    tg5.start();
193    tg1.join();
194    tg2.join();
195    tg3.join();
196    tg4.join();
197    tg5.join();
198    assertEquals(2, tg1.getServerBusyException + tg2.getServerBusyException
199      + tg3.getServerBusyException + tg4.getServerBusyException + tg5.getServerBusyException);
200
201    // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at
202    // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException.
203
204    TestPutThread tp1 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
205    TestPutThread tp2 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
206    TestPutThread tp3 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
207    TestPutThread tp4 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
208    TestPutThread tp5 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
209    tp1.start();
210    tp2.start();
211    tp3.start();
212    tp4.start();
213    tp5.start();
214    tp1.join();
215    tp2.join();
216    tp3.join();
217    tp4.join();
218    tp5.join();
219    assertEquals(2, tp1.getServerBusyException + tp2.getServerBusyException
220      + tp3.getServerBusyException + tp4.getServerBusyException + tp5.getServerBusyException);
221  }
222}