001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import java.io.IOException; 025import java.net.SocketTimeoutException; 026import java.util.Arrays; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.client.ConnectionFactory; 032import org.apache.hadoop.hbase.client.Get; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.RegionLocator; 035import org.apache.hadoop.hbase.client.ResultScanner; 036import org.apache.hadoop.hbase.client.RetriesExhaustedException; 037import org.apache.hadoop.hbase.client.Scan; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.ipc.CallTimeoutException; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.regionserver.RSRpcServices; 043import org.apache.hadoop.hbase.testclassification.ClientTests; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.util.Bytes; 046import org.junit.jupiter.api.AfterAll; 047import org.junit.jupiter.api.BeforeAll; 048import org.junit.jupiter.api.BeforeEach; 049import org.junit.jupiter.api.Tag; 050import org.junit.jupiter.api.Test; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 057 058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 059 060/** 061 * These tests verify that the RPC timeouts ('hbase.client.operation.timeout' and 062 * 'hbase.client.scanner.timeout.period') work correctly using a modified Region Server which 063 * injects delays to get, scan and mutate operations. 064 * <p/> 065 * When 'hbase.client.operation.timeout' is set and client operation is not completed in time the 066 * client will retry the operation 'hbase.client.retries.number' times. After that 067 * {@link SocketTimeoutException} will be thrown. 068 * <p/> 069 * Using 'hbase.client.scanner.timeout.period' configuration property similar behavior can be 070 * specified for scan related operations such as openScanner(), next(). If that times out 071 * {@link RetriesExhaustedException} will be thrown. 072 */ 073@Tag(ClientTests.TAG) 074@Tag(MediumTests.TAG) 075public class TestClientOperationTimeout { 076 077 private static final Logger LOG = LoggerFactory.getLogger(TestClientOperationTimeout.class); 078 079 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 080 081 // Activate the delays after table creation to test get/scan/put 082 private static int DELAY_GET; 083 private static int DELAY_SCAN; 084 private static int DELAY_MUTATE; 085 private static int DELAY_BATCH_MUTATE; 086 087 private static final TableName TABLE_NAME = TableName.valueOf("Timeout"); 088 private static final byte[] FAMILY = Bytes.toBytes("family"); 089 private static final byte[] ROW = Bytes.toBytes("row"); 090 private static final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 091 private static final byte[] VALUE = Bytes.toBytes("value"); 092 093 private static Connection CONN; 094 private static Table TABLE; 095 096 @BeforeAll 097 public static void setUp() throws Exception { 098 // Set RegionServer class and use default values for other options. 099 StartTestingClusterOption option = 100 StartTestingClusterOption.builder().rsClass(DelayedRegionServer.class).build(); 101 UTIL.startMiniCluster(option); 102 UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) 103 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build()); 104 105 Configuration conf = new Configuration(UTIL.getConfiguration()); 106 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500); 107 conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 500); 108 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500); 109 conf.setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 110 CONN = ConnectionFactory.createConnection(conf); 111 TABLE = CONN.getTable(TABLE_NAME); 112 } 113 114 @AfterAll 115 public static void tearDown() throws Exception { 116 Closeables.close(TABLE, true); 117 Closeables.close(CONN, true); 118 UTIL.shutdownMiniCluster(); 119 } 120 121 @BeforeEach 122 public void setUpBeforeTest() throws Exception { 123 DELAY_GET = 0; 124 DELAY_SCAN = 0; 125 DELAY_MUTATE = 0; 126 DELAY_BATCH_MUTATE = 0; 127 } 128 129 /** 130 * Tests that a get on a table throws {@link RetriesExhaustedException} when the operation takes 131 * longer than 'hbase.client.operation.timeout'. 132 */ 133 @Test 134 public void testGetTimeout() { 135 DELAY_GET = 600; 136 try { 137 TABLE.get(new Get(ROW)); 138 fail("should not reach here"); 139 } catch (Exception e) { 140 LOG.info("Got exception for get", e); 141 assertThat(e, instanceOf(RetriesExhaustedException.class)); 142 assertThat(e.getCause(), instanceOf(CallTimeoutException.class)); 143 } 144 } 145 146 /** 147 * Tests that a put on a table throws {@link RetriesExhaustedException} when the operation takes 148 * longer than 'hbase.client.operation.timeout'. 149 */ 150 @Test 151 public void testPutTimeout() { 152 DELAY_MUTATE = 600; 153 Put put = new Put(ROW); 154 put.addColumn(FAMILY, QUALIFIER, VALUE); 155 try { 156 TABLE.put(put); 157 fail("should not reach here"); 158 } catch (Exception e) { 159 LOG.info("Got exception for put", e); 160 assertThat(e, instanceOf(RetriesExhaustedException.class)); 161 assertThat(e.getCause(), instanceOf(CallTimeoutException.class)); 162 } 163 } 164 165 /** 166 * Tests that a batch mutate on a table throws {@link RetriesExhaustedException} when the 167 * operation takes longer than 'hbase.client.operation.timeout'. 168 */ 169 @Test 170 public void testMultiPutsTimeout() { 171 DELAY_BATCH_MUTATE = 600; 172 Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE); 173 Put put2 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE); 174 List<Put> puts = Arrays.asList(put1, put2); 175 try { 176 TABLE.batch(puts, new Object[2]); 177 fail("should not reach here"); 178 } catch (Exception e) { 179 LOG.info("Got exception for batch", e); 180 assertThat(e, instanceOf(RetriesExhaustedException.class)); 181 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); 182 assertThat(e.getCause().getCause(), instanceOf(CallTimeoutException.class)); 183 } 184 } 185 186 /** 187 * Tests that scan on a table throws {@link RetriesExhaustedException} when the operation takes 188 * longer than 'hbase.client.scanner.timeout.period'. 189 */ 190 @Test 191 public void testScanTimeout() throws IOException, InterruptedException { 192 // cache the region location. 193 try (RegionLocator locator = TABLE.getRegionLocator()) { 194 locator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY); 195 } 196 // sleep a bit to make sure the location has been cached as it is an async operation. 197 Thread.sleep(100); 198 DELAY_SCAN = 600; 199 try (ResultScanner scanner = TABLE.getScanner(new Scan())) { 200 scanner.next(); 201 fail("should not reach here"); 202 } catch (Exception e) { 203 LOG.info("Got exception for scan", e); 204 assertThat(e, instanceOf(RetriesExhaustedException.class)); 205 assertThat(e.getCause(), instanceOf(CallTimeoutException.class)); 206 } 207 } 208 209 public static final class DelayedRegionServer 210 extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer { 211 public DelayedRegionServer(Configuration conf) throws IOException, InterruptedException { 212 super(conf); 213 } 214 215 @Override 216 protected RSRpcServices createRpcServices() throws IOException { 217 return new DelayedRSRpcServices(this); 218 } 219 } 220 221 /** 222 * This {@link RSRpcServices} class injects delay for Rpc calls and after executes super methods. 223 */ 224 private static final class DelayedRSRpcServices extends RSRpcServices { 225 DelayedRSRpcServices(HRegionServer rs) throws IOException { 226 super(rs); 227 } 228 229 @Override 230 public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request) 231 throws ServiceException { 232 try { 233 Thread.sleep(DELAY_GET); 234 } catch (InterruptedException e) { 235 LOG.error("Sleep interrupted during get operation", e); 236 } 237 return super.get(controller, request); 238 } 239 240 @Override 241 public ClientProtos.MutateResponse mutate(RpcController rpcc, 242 ClientProtos.MutateRequest request) throws ServiceException { 243 try { 244 Thread.sleep(DELAY_MUTATE); 245 } catch (InterruptedException e) { 246 LOG.error("Sleep interrupted during mutate operation", e); 247 } 248 return super.mutate(rpcc, request); 249 } 250 251 @Override 252 public ClientProtos.ScanResponse scan(RpcController controller, 253 ClientProtos.ScanRequest request) throws ServiceException { 254 try { 255 Thread.sleep(DELAY_SCAN); 256 } catch (InterruptedException e) { 257 LOG.error("Sleep interrupted during scan operation", e); 258 } 259 return super.scan(controller, request); 260 } 261 262 @Override 263 public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request) 264 throws ServiceException { 265 try { 266 Thread.sleep(DELAY_BATCH_MUTATE); 267 } catch (InterruptedException e) { 268 LOG.error("Sleep interrupted during multi operation", e); 269 } 270 return super.multi(rpcc, request); 271 } 272 } 273}