001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicLong; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.coprocessor.ObserverContext; 031import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 032import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 033import org.apache.hadoop.hbase.coprocessor.RegionObserver; 034import org.apache.hadoop.hbase.ipc.ServerTooBusyException; 035import org.apache.hadoop.hbase.testclassification.LargeTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.hadoop.hbase.wal.WALEdit; 039import org.junit.jupiter.api.AfterAll; 040import org.junit.jupiter.api.BeforeAll; 041import org.junit.jupiter.api.Tag; 042import org.junit.jupiter.api.Test; 043import org.junit.jupiter.api.TestInfo; 044 045/** 046 * This class is for testing HBaseConnectionManager ServerBusyException. Be careful adding to this 047 * class. It sets a low HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD 048 */ 049@Tag(LargeTests.TAG) 050public class TestServerBusyException { 051 052 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 053 private static final byte[] FAM_NAM = Bytes.toBytes("f"); 054 private static final byte[] ROW = Bytes.toBytes("bbb"); 055 private static final int RPC_RETRY = 5; 056 057 public static class SleepCoprocessor implements RegionCoprocessor, RegionObserver { 058 public static final int SLEEP_TIME = 5000; 059 060 @Override 061 public Optional<RegionObserver> getRegionObserver() { 062 return Optional.of(this); 063 } 064 065 @Override 066 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 067 final Get get, final List<Cell> results) throws IOException { 068 Threads.sleep(SLEEP_TIME); 069 } 070 071 @Override 072 public void prePut(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 073 final Put put, final WALEdit edit, final Durability durability) throws IOException { 074 Threads.sleep(SLEEP_TIME); 075 } 076 077 @Override 078 public Result preIncrement(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 079 final Increment increment) throws IOException { 080 Threads.sleep(SLEEP_TIME); 081 return null; 082 } 083 084 @Override 085 public void preDelete(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 086 final Delete delete, final WALEdit edit, final Durability durability) throws IOException { 087 Threads.sleep(SLEEP_TIME); 088 } 089 090 } 091 092 public static class SleepLongerAtFirstCoprocessor implements RegionCoprocessor, RegionObserver { 093 public static final int SLEEP_TIME = 2000; 094 static final AtomicLong ct = new AtomicLong(0); 095 096 @Override 097 public Optional<RegionObserver> getRegionObserver() { 098 return Optional.of(this); 099 } 100 101 @Override 102 public void preGetOp(final ObserverContext<? extends RegionCoprocessorEnvironment> e, 103 final Get get, final List<Cell> results) throws IOException { 104 // After first sleep, all requests are timeout except the last retry. If we handle 105 // all the following requests, finally the last request is also timeout. If we drop all 106 // timeout requests, we can handle the last request immediately and it will not timeout. 107 if (ct.incrementAndGet() <= 1) { 108 Threads.sleep(SLEEP_TIME * RPC_RETRY * 2); 109 } else { 110 Threads.sleep(SLEEP_TIME); 111 } 112 } 113 } 114 115 @BeforeAll 116 public static void setUpBeforeClass() throws Exception { 117 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); 118 // Up the handlers; this test needs more than usual. 119 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); 120 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); 121 // simulate queue blocking in testDropTimeoutRequest 122 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1); 123 // Needed by the server busy test. 124 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3); 125 TEST_UTIL.startMiniCluster(2); 126 } 127 128 @AfterAll 129 public static void tearDownAfterClass() throws Exception { 130 TEST_UTIL.shutdownMiniCluster(); 131 } 132 133 private static class TestPutThread extends Thread { 134 Table table; 135 int getServerBusyException = 0; 136 137 TestPutThread(Table table) { 138 this.table = table; 139 } 140 141 @Override 142 public void run() { 143 try { 144 Put p = new Put(ROW); 145 p.addColumn(FAM_NAM, new byte[] { 0 }, new byte[] { 0 }); 146 table.put(p); 147 } catch (ServerTooBusyException e) { 148 getServerBusyException = 1; 149 } catch (IOException ignore) { 150 } 151 } 152 } 153 154 private static class TestGetThread extends Thread { 155 Table table; 156 int getServerBusyException = 0; 157 158 TestGetThread(Table table) { 159 this.table = table; 160 } 161 162 @Override 163 public void run() { 164 try { 165 Get g = new Get(ROW); 166 g.addColumn(FAM_NAM, new byte[] { 0 }); 167 table.get(g); 168 } catch (ServerTooBusyException e) { 169 getServerBusyException = 1; 170 } catch (IOException ignore) { 171 } 172 } 173 } 174 175 @Test() 176 public void testServerBusyException(TestInfo testInfo) throws Exception { 177 TableDescriptor hdt = 178 TEST_UTIL.createModifyableTableDescriptor(testInfo.getTestMethod().get().getName()) 179 .setCoprocessor(SleepCoprocessor.class.getName()).build(); 180 Configuration c = new Configuration(TEST_UTIL.getConfiguration()); 181 TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c); 182 183 TestGetThread tg1 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 184 TestGetThread tg2 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 185 TestGetThread tg3 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 186 TestGetThread tg4 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 187 TestGetThread tg5 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 188 tg1.start(); 189 tg2.start(); 190 tg3.start(); 191 tg4.start(); 192 tg5.start(); 193 tg1.join(); 194 tg2.join(); 195 tg3.join(); 196 tg4.join(); 197 tg5.join(); 198 assertEquals(2, tg1.getServerBusyException + tg2.getServerBusyException 199 + tg3.getServerBusyException + tg4.getServerBusyException + tg5.getServerBusyException); 200 201 // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at 202 // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException. 203 204 TestPutThread tp1 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 205 TestPutThread tp2 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 206 TestPutThread tp3 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 207 TestPutThread tp4 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 208 TestPutThread tp5 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName())); 209 tp1.start(); 210 tp2.start(); 211 tp3.start(); 212 tp4.start(); 213 tp5.start(); 214 tp1.join(); 215 tp2.join(); 216 tp3.join(); 217 tp4.join(); 218 tp5.join(); 219 assertEquals(2, tp1.getServerBusyException + tp2.getServerBusyException 220 + tp3.getServerBusyException + tp4.getServerBusyException + tp5.getServerBusyException); 221 } 222}