001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicLong; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.HTableDescriptor; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.coprocessor.ObserverContext; 034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 036import org.apache.hadoop.hbase.coprocessor.RegionObserver; 037import org.apache.hadoop.hbase.ipc.ServerTooBusyException; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.Threads; 041import org.apache.hadoop.hbase.wal.WALEdit; 042import org.junit.AfterClass; 043import org.junit.BeforeClass; 044import org.junit.ClassRule; 045import org.junit.Rule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.rules.TestName; 049 050/** 051 * This class is for testing HBaseConnectionManager ServerBusyException. 052 * Be careful adding to this class. It sets a low 053 * HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD 054 */ 055@Category({LargeTests.class}) 056public class TestServerBusyException { 057 058 @ClassRule 059 public static final HBaseClassTestRule CLASS_RULE = 060 HBaseClassTestRule.forClass(TestServerBusyException.class); 061 062 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 063 private static final byte[] FAM_NAM = Bytes.toBytes("f"); 064 private static final byte[] ROW = Bytes.toBytes("bbb"); 065 private static final int RPC_RETRY = 5; 066 067 @Rule 068 public TestName name = new TestName(); 069 070 public static class SleepCoprocessor implements RegionCoprocessor, RegionObserver { 071 public static final int SLEEP_TIME = 5000; 072 @Override 073 public Optional<RegionObserver> getRegionObserver() { 074 return Optional.of(this); 075 } 076 077 @Override 078 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, 079 final Get get, final List<Cell> results) throws IOException { 080 Threads.sleep(SLEEP_TIME); 081 } 082 083 @Override 084 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, 085 final Put put, final WALEdit edit, final Durability durability) throws IOException { 086 Threads.sleep(SLEEP_TIME); 087 } 088 089 @Override 090 public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e, 091 final Increment increment) throws IOException { 092 Threads.sleep(SLEEP_TIME); 093 return null; 094 } 095 096 @Override 097 public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, 098 final WALEdit edit, final Durability durability) throws IOException { 099 Threads.sleep(SLEEP_TIME); 100 } 101 102 } 103 104 public static class SleepLongerAtFirstCoprocessor implements RegionCoprocessor, RegionObserver { 105 public static final int SLEEP_TIME = 2000; 106 static final AtomicLong ct = new AtomicLong(0); 107 108 @Override 109 public Optional<RegionObserver> getRegionObserver() { 110 return Optional.of(this); 111 } 112 113 @Override 114 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, 115 final Get get, final List<Cell> results) throws IOException { 116 // After first sleep, all requests are timeout except the last retry. If we handle 117 // all the following requests, finally the last request is also timeout. If we drop all 118 // timeout requests, we can handle the last request immediately and it will not timeout. 119 if (ct.incrementAndGet() <= 1) { 120 Threads.sleep(SLEEP_TIME * RPC_RETRY * 2); 121 } else { 122 Threads.sleep(SLEEP_TIME); 123 } 124 } 125 } 126 127 @BeforeClass 128 public static void setUpBeforeClass() throws Exception { 129 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); 130 // Up the handlers; this test needs more than usual. 131 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 132 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 133 // simulate queue blocking in testDropTimeoutRequest 134 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1); 135 // Needed by the server busy test. 136 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3); 137 TEST_UTIL.startMiniCluster(2); 138 } 139 140 @AfterClass public static void tearDownAfterClass() throws Exception { 141 TEST_UTIL.shutdownMiniCluster(); 142 } 143 144 private static class TestPutThread extends Thread { 145 Table table; 146 int getServerBusyException = 0; 147 148 TestPutThread(Table table){ 149 this.table = table; 150 } 151 152 @Override 153 public void run() { 154 try { 155 Put p = new Put(ROW); 156 p.addColumn(FAM_NAM, new byte[] { 0 }, new byte[] { 0 }); 157 table.put(p); 158 } catch (ServerTooBusyException e) { 159 getServerBusyException = 1; 160 } catch (IOException ignore) { 161 } 162 } 163 } 164 165 private static class TestGetThread extends Thread { 166 Table table; 167 int getServerBusyException = 0; 168 169 TestGetThread(Table table){ 170 this.table = table; 171 } 172 173 @Override 174 public void run() { 175 try { 176 Get g = new Get(ROW); 177 g.addColumn(FAM_NAM, new byte[] { 0 }); 178 table.get(g); 179 } catch (ServerTooBusyException e) { 180 getServerBusyException = 1; 181 } catch (IOException ignore) { 182 } 183 } 184 } 185 186 @Test() 187 public void testServerBusyException() throws Exception { 188 HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName())); 189 hdt.addCoprocessor(SleepCoprocessor.class.getName()); 190 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 191 TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c); 192 193 TestGetThread tg1 = 194 new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 195 TestGetThread tg2 = 196 new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 197 TestGetThread tg3 = 198 new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 199 TestGetThread tg4 = 200 new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 201 TestGetThread tg5 = 202 new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 203 tg1.start(); 204 tg2.start(); 205 tg3.start(); 206 tg4.start(); 207 tg5.start(); 208 tg1.join(); 209 tg2.join(); 210 tg3.join(); 211 tg4.join(); 212 tg5.join(); 213 assertEquals(2, 214 tg1.getServerBusyException + tg2.getServerBusyException + tg3.getServerBusyException 215 + tg4.getServerBusyException + tg5.getServerBusyException); 216 217 // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at 218 // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException. 219 220 TestPutThread tp1 = 221 new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 222 TestPutThread tp2 = 223 new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 224 TestPutThread tp3 = 225 new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 226 TestPutThread tp4 = 227 new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 228 TestPutThread tp5 = 229 new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 230 tp1.start(); 231 tp2.start(); 232 tp3.start(); 233 tp4.start(); 234 tp5.start(); 235 tp1.join(); 236 tp2.join(); 237 tp3.join(); 238 tp4.join(); 239 tp5.join(); 240 assertEquals(2, 241 tp1.getServerBusyException + tp2.getServerBusyException + tp3.getServerBusyException 242 + tp4.getServerBusyException + tp5.getServerBusyException); 243 } 244}