001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicLong; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.HTableDescriptor; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.coprocessor.ObserverContext; 034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 036import org.apache.hadoop.hbase.coprocessor.RegionObserver; 037import org.apache.hadoop.hbase.ipc.ServerTooBusyException; 038import org.apache.hadoop.hbase.testclassification.LargeTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.Threads; 041import org.apache.hadoop.hbase.wal.WALEdit; 042import org.junit.AfterClass; 043import org.junit.BeforeClass; 044import org.junit.ClassRule; 045import org.junit.Rule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.junit.rules.TestName; 049 050/** 051 * This class is for testing HBaseConnectionManager ServerBusyException. Be careful adding to this 052 * class. It sets a low HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD 053 */ 054@Category({ LargeTests.class }) 055public class TestServerBusyException { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestServerBusyException.class); 060 061 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 062 private static final byte[] FAM_NAM = Bytes.toBytes("f"); 063 private static final byte[] ROW = Bytes.toBytes("bbb"); 064 private static final int RPC_RETRY = 5; 065 066 @Rule 067 public TestName name = new TestName(); 068 069 public static class SleepCoprocessor implements RegionCoprocessor, RegionObserver { 070 public static final int SLEEP_TIME = 5000; 071 072 @Override 073 public Optional<RegionObserver> getRegionObserver() { 074 return Optional.of(this); 075 } 076 077 @Override 078 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, 079 final List<Cell> results) throws IOException { 080 Threads.sleep(SLEEP_TIME); 081 } 082 083 @Override 084 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put, 085 final WALEdit edit, final Durability durability) throws IOException { 086 Threads.sleep(SLEEP_TIME); 087 } 088 089 @Override 090 public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e, 091 final Increment increment) throws IOException { 092 Threads.sleep(SLEEP_TIME); 093 return null; 094 } 095 096 @Override 097 public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, 098 final Delete delete, final WALEdit edit, final Durability durability) throws IOException { 099 Threads.sleep(SLEEP_TIME); 100 } 101 102 } 103 104 public static class SleepLongerAtFirstCoprocessor implements RegionCoprocessor, RegionObserver { 105 public static final int SLEEP_TIME = 2000; 106 static final AtomicLong ct = new AtomicLong(0); 107 108 @Override 109 public Optional<RegionObserver> getRegionObserver() { 110 return Optional.of(this); 111 } 112 113 @Override 114 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, 115 final List<Cell> results) throws IOException { 116 // After first sleep, all requests are timeout except the last retry. If we handle 117 // all the following requests, finally the last request is also timeout. If we drop all 118 // timeout requests, we can handle the last request immediately and it will not timeout. 119 if (ct.incrementAndGet() <= 1) { 120 Threads.sleep(SLEEP_TIME * RPC_RETRY * 2); 121 } else { 122 Threads.sleep(SLEEP_TIME); 123 } 124 } 125 } 126 127 @BeforeClass 128 public static void setUpBeforeClass() throws Exception { 129 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); 130 // Up the handlers; this test needs more than usual. 131 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 132 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 133 // simulate queue blocking in testDropTimeoutRequest 134 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1); 135 // Needed by the server busy test. 136 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3); 137 TEST_UTIL.startMiniCluster(2); 138 } 139 140 @AfterClass 141 public static void tearDownAfterClass() throws Exception { 142 TEST_UTIL.shutdownMiniCluster(); 143 } 144 145 private static class TestPutThread extends Thread { 146 Table table; 147 int getServerBusyException = 0; 148 149 TestPutThread(Table table) { 150 this.table = table; 151 } 152 153 @Override 154 public void run() { 155 try { 156 Put p = new Put(ROW); 157 p.addColumn(FAM_NAM, new byte[] { 0 }, new byte[] { 0 }); 158 table.put(p); 159 } catch (ServerTooBusyException e) { 160 getServerBusyException = 1; 161 } catch (IOException ignore) { 162 } 163 } 164 } 165 166 private static class TestGetThread extends Thread { 167 Table table; 168 int getServerBusyException = 0; 169 170 TestGetThread(Table table) { 171 this.table = table; 172 } 173 174 @Override 175 public void run() { 176 try { 177 Get g = new Get(ROW); 178 g.addColumn(FAM_NAM, new byte[] { 0 }); 179 table.get(g); 180 } catch (ServerTooBusyException e) { 181 getServerBusyException = 1; 182 } catch (IOException ignore) { 183 } 184 } 185 } 186 187 @Test() 188 public void testServerBusyException() throws Exception { 189 HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(TableName.valueOf(name.getMethodName())); 190 hdt.addCoprocessor(SleepCoprocessor.class.getName()); 191 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 192 TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c); 193 194 TestGetThread tg1 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 195 TestGetThread tg2 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 196 TestGetThread tg3 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 197 TestGetThread tg4 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 198 TestGetThread tg5 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 199 tg1.start(); 200 tg2.start(); 201 tg3.start(); 202 tg4.start(); 203 tg5.start(); 204 tg1.join(); 205 tg2.join(); 206 tg3.join(); 207 tg4.join(); 208 tg5.join(); 209 assertEquals(2, tg1.getServerBusyException + tg2.getServerBusyException 210 + tg3.getServerBusyException + tg4.getServerBusyException + tg5.getServerBusyException); 211 212 // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at 213 // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException. 214 215 TestPutThread tp1 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 216 TestPutThread tp2 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 217 TestPutThread tp3 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 218 TestPutThread tp4 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 219 TestPutThread tp5 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 220 tp1.start(); 221 tp2.start(); 222 tp3.start(); 223 tp4.start(); 224 tp5.start(); 225 tp1.join(); 226 tp2.join(); 227 tp3.join(); 228 tp4.join(); 229 tp5.join(); 230 assertEquals(2, tp1.getServerBusyException + tp2.getServerBusyException 231 + tp3.getServerBusyException + tp4.getServerBusyException + tp5.getServerBusyException); 232 } 233}