001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicLong; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.coprocessor.ObserverContext; 032import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 033import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 034import org.apache.hadoop.hbase.coprocessor.RegionObserver; 035import org.apache.hadoop.hbase.ipc.ServerTooBusyException; 036import org.apache.hadoop.hbase.testclassification.LargeTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.util.Threads; 039import org.apache.hadoop.hbase.wal.WALEdit; 040import org.junit.AfterClass; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Rule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.junit.rules.TestName; 047 048/** 049 * This class is for testing HBaseConnectionManager ServerBusyException. Be careful adding to this 050 * class. It sets a low HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD 051 */ 052@Category({ LargeTests.class }) 053public class TestServerBusyException { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestServerBusyException.class); 058 059 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 060 private static final byte[] FAM_NAM = Bytes.toBytes("f"); 061 private static final byte[] ROW = Bytes.toBytes("bbb"); 062 private static final int RPC_RETRY = 5; 063 064 @Rule 065 public TestName name = new TestName(); 066 067 public static class SleepCoprocessor implements RegionCoprocessor, RegionObserver { 068 public static final int SLEEP_TIME = 5000; 069 070 @Override 071 public Optional<RegionObserver> getRegionObserver() { 072 return Optional.of(this); 073 } 074 075 @Override 076 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 077 final Get get, final List<Cell> results) throws IOException { 078 Threads.sleep(SLEEP_TIME); 079 } 080 081 @Override 082 public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 083 final Put put, final WALEdit edit, final Durability durability) throws IOException { 084 Threads.sleep(SLEEP_TIME); 085 } 086 087 @Override 088 public Result preIncrement(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 089 final Increment increment) throws IOException { 090 Threads.sleep(SLEEP_TIME); 091 return null; 092 } 093 094 @Override 095 public void preDelete(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 096 final Delete delete, final WALEdit edit, final Durability durability) throws IOException { 097 Threads.sleep(SLEEP_TIME); 098 } 099 100 } 101 102 public static class SleepLongerAtFirstCoprocessor implements RegionCoprocessor, RegionObserver { 103 public static final int SLEEP_TIME = 2000; 104 static final AtomicLong ct = new AtomicLong(0); 105 106 @Override 107 public Optional<RegionObserver> getRegionObserver() { 108 return Optional.of(this); 109 } 110 111 @Override 112 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 113 final Get get, final List<Cell> results) throws IOException { 114 // After first sleep, all requests are timeout except the last retry. If we handle 115 // all the following requests, finally the last request is also timeout. If we drop all 116 // timeout requests, we can handle the last request immediately and it will not timeout. 117 if (ct.incrementAndGet() <= 1) { 118 Threads.sleep(SLEEP_TIME * RPC_RETRY * 2); 119 } else { 120 Threads.sleep(SLEEP_TIME); 121 } 122 } 123 } 124 125 @BeforeClass 126 public static void setUpBeforeClass() throws Exception { 127 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); 128 // Up the handlers; this test needs more than usual. 129 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 130 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 131 // simulate queue blocking in testDropTimeoutRequest 132 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1); 133 // Needed by the server busy test. 134 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3); 135 TEST_UTIL.startMiniCluster(2); 136 } 137 138 @AfterClass 139 public static void tearDownAfterClass() throws Exception { 140 TEST_UTIL.shutdownMiniCluster(); 141 } 142 143 private static class TestPutThread extends Thread { 144 Table table; 145 int getServerBusyException = 0; 146 147 TestPutThread(Table table) { 148 this.table = table; 149 } 150 151 @Override 152 public void run() { 153 try { 154 Put p = new Put(ROW); 155 p.addColumn(FAM_NAM, new byte[] { 0 }, new byte[] { 0 }); 156 table.put(p); 157 } catch (ServerTooBusyException e) { 158 getServerBusyException = 1; 159 } catch (IOException ignore) { 160 } 161 } 162 } 163 164 private static class TestGetThread extends Thread { 165 Table table; 166 int getServerBusyException = 0; 167 168 TestGetThread(Table table) { 169 this.table = table; 170 } 171 172 @Override 173 public void run() { 174 try { 175 Get g = new Get(ROW); 176 g.addColumn(FAM_NAM, new byte[] { 0 }); 177 table.get(g); 178 } catch (ServerTooBusyException e) { 179 getServerBusyException = 1; 180 } catch (IOException ignore) { 181 } 182 } 183 } 184 185 @Test() 186 public void testServerBusyException() throws Exception { 187 TableDescriptor hdt = TEST_UTIL.createModifyableTableDescriptor(name.getMethodName()) 188 .setCoprocessor(SleepCoprocessor.class.getName()).build(); 189 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 190 TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c); 191 192 TestGetThread tg1 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 193 TestGetThread tg2 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 194 TestGetThread tg3 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 195 TestGetThread tg4 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 196 TestGetThread tg5 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 197 tg1.start(); 198 tg2.start(); 199 tg3.start(); 200 tg4.start(); 201 tg5.start(); 202 tg1.join(); 203 tg2.join(); 204 tg3.join(); 205 tg4.join(); 206 tg5.join(); 207 assertEquals(2, tg1.getServerBusyException + tg2.getServerBusyException 208 + tg3.getServerBusyException + tg4.getServerBusyException + tg5.getServerBusyException); 209 210 // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at 211 // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException. 212 213 TestPutThread tp1 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 214 TestPutThread tp2 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 215 TestPutThread tp3 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 216 TestPutThread tp4 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 217 TestPutThread tp5 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 218 tp1.start(); 219 tp2.start(); 220 tp3.start(); 221 tp4.start(); 222 tp5.start(); 223 tp1.join(); 224 tp2.join(); 225 tp3.join(); 226 tp4.join(); 227 tp5.join(); 228 assertEquals(2, tp1.getServerBusyException + tp2.getServerBusyException 229 + tp3.getServerBusyException + tp4.getServerBusyException + tp5.getServerBusyException); 230 } 231}