001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.net.SocketTimeoutException; 022import java.util.ArrayList; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 026import org.apache.hadoop.hbase.client.Connection; 027import org.apache.hadoop.hbase.client.ConnectionFactory; 028import org.apache.hadoop.hbase.client.Get; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.client.ResultScanner; 031import org.apache.hadoop.hbase.client.RetriesExhaustedException; 032import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 036import org.apache.hadoop.hbase.ipc.CallTimeoutException; 037import org.apache.hadoop.hbase.regionserver.HRegionServer; 038import org.apache.hadoop.hbase.regionserver.RSRpcServices; 039import org.apache.hadoop.hbase.testclassification.ClientTests; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.junit.AfterClass; 043import org.junit.Assert; 044import org.junit.Before; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049 050import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 051import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 052import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 053 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 055 056/** 057 * These tests verify that the RPC timeouts ('hbase.client.operation.timeout' and 058 * 'hbase.client.scanner.timeout.period') work correctly using a modified Region Server which 059 * injects delays to get, scan and mutate operations. 060 * <p/> 061 * When 'hbase.client.operation.timeout' is set and client operation is not completed in time the 062 * client will retry the operation 'hbase.client.retries.number' times. After that 063 * {@link SocketTimeoutException} will be thrown. 064 * <p/> 065 * Using 'hbase.client.scanner.timeout.period' configuration property similar behavior can be 066 * specified for scan related operations such as openScanner(), next(). If that times out 067 * {@link RetriesExhaustedException} will be thrown. 068 */ 069@Category({ ClientTests.class, MediumTests.class }) 070public class TestClientOperationTimeout { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestClientOperationTimeout.class); 075 076 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 077 078 // Activate the delays after table creation to test get/scan/put 079 private static int DELAY_GET; 080 private static int DELAY_SCAN; 081 private static int DELAY_MUTATE; 082 private static int DELAY_BATCH_MUTATE; 083 084 private static final TableName TABLE_NAME = TableName.valueOf("Timeout"); 085 private static final byte[] FAMILY = Bytes.toBytes("family"); 086 private static final byte[] ROW = Bytes.toBytes("row"); 087 private static final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 088 private static final byte[] VALUE = Bytes.toBytes("value"); 089 090 private static Connection CONN; 091 private static Table TABLE; 092 093 @BeforeClass 094 public static void setUpClass() throws Exception { 095 // Set RegionServer class and use default values for other options. 096 StartMiniClusterOption option = 097 StartMiniClusterOption.builder().rsClass(DelayedRegionServer.class).build(); 098 UTIL.startMiniCluster(option); 099 UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) 100 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build()); 101 102 Configuration conf = new Configuration(UTIL.getConfiguration()); 103 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500); 104 conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 500); 105 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500); 106 conf.setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 107 CONN = ConnectionFactory.createConnection(conf); 108 TABLE = CONN.getTable(TABLE_NAME); 109 } 110 111 @Before 112 public void setUp() throws Exception { 113 DELAY_GET = 0; 114 DELAY_SCAN = 0; 115 DELAY_MUTATE = 0; 116 DELAY_BATCH_MUTATE = 0; 117 } 118 119 @AfterClass 120 public static void tearDown() throws Exception { 121 Closeables.close(TABLE, true); 122 Closeables.close(CONN, true); 123 UTIL.shutdownMiniCluster(); 124 } 125 126 /** 127 * Tests that a get on a table throws {@link RetriesExhaustedException} when the operation takes 128 * longer than 'hbase.client.operation.timeout'. 129 */ 130 @Test 131 public void testGetTimeout() { 132 DELAY_GET = 600; 133 try { 134 TABLE.get(new Get(ROW)); 135 Assert.fail("should not reach here"); 136 } catch (Exception e) { 137 Assert.assertTrue( 138 e instanceof SocketTimeoutException && e.getCause() instanceof CallTimeoutException); 139 } 140 } 141 142 /** 143 * Tests that a put on a table throws {@link RetriesExhaustedException} when the operation takes 144 * longer than 'hbase.client.operation.timeout'. 145 */ 146 @Test 147 public void testPutTimeout() { 148 DELAY_MUTATE = 600; 149 Put put = new Put(ROW); 150 put.addColumn(FAMILY, QUALIFIER, VALUE); 151 try { 152 TABLE.put(put); 153 Assert.fail("should not reach here"); 154 } catch (Exception e) { 155 Assert.assertTrue( 156 e instanceof SocketTimeoutException && e.getCause() instanceof CallTimeoutException); 157 } 158 } 159 160 /** 161 * Tests that a batch mutate on a table throws {@link RetriesExhaustedException} when the 162 * operation takes longer than 'hbase.client.operation.timeout'. 163 */ 164 @Test 165 public void testMultiPutsTimeout() { 166 DELAY_BATCH_MUTATE = 600; 167 Put put1 = new Put(ROW); 168 put1.addColumn(FAMILY, QUALIFIER, VALUE); 169 Put put2 = new Put(ROW); 170 put2.addColumn(FAMILY, QUALIFIER, VALUE); 171 List<Put> puts = new ArrayList<>(); 172 puts.add(put1); 173 puts.add(put2); 174 try { 175 TABLE.batch(puts, new Object[2]); 176 Assert.fail("should not reach here"); 177 } catch (Exception e) { 178 Assert.assertTrue(e instanceof RetriesExhaustedWithDetailsException); 179 } 180 } 181 182 /** 183 * Tests that scan on a table throws {@link RetriesExhaustedException} when the operation takes 184 * longer than 'hbase.client.scanner.timeout.period'. 185 */ 186 @Test 187 public void testScanTimeout() { 188 DELAY_SCAN = 600; 189 try { 190 ResultScanner scanner = TABLE.getScanner(new Scan()); 191 scanner.next(); 192 Assert.fail("should not reach here"); 193 } catch (Exception e) { 194 Assert.assertTrue( 195 e instanceof RetriesExhaustedException && e.getCause() instanceof SocketTimeoutException); 196 } 197 } 198 199 private static class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer { 200 public DelayedRegionServer(Configuration conf) throws IOException, InterruptedException { 201 super(conf); 202 } 203 204 @Override 205 protected RSRpcServices createRpcServices() throws IOException { 206 return new DelayedRSRpcServices(this); 207 } 208 } 209 210 /** 211 * This {@link RSRpcServices} class injects delay for Rpc calls and after executes super methods. 212 */ 213 public static class DelayedRSRpcServices extends RSRpcServices { 214 DelayedRSRpcServices(HRegionServer rs) throws IOException { 215 super(rs); 216 } 217 218 @Override 219 public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request) 220 throws ServiceException { 221 try { 222 Thread.sleep(DELAY_GET); 223 } catch (InterruptedException e) { 224 LOG.error("Sleep interrupted during get operation", e); 225 } 226 return super.get(controller, request); 227 } 228 229 @Override 230 public ClientProtos.MutateResponse mutate(RpcController rpcc, 231 ClientProtos.MutateRequest request) throws ServiceException { 232 try { 233 Thread.sleep(DELAY_MUTATE); 234 } catch (InterruptedException e) { 235 LOG.error("Sleep interrupted during mutate operation", e); 236 } 237 return super.mutate(rpcc, request); 238 } 239 240 @Override 241 public ClientProtos.ScanResponse scan(RpcController controller, 242 ClientProtos.ScanRequest request) throws ServiceException { 243 try { 244 Thread.sleep(DELAY_SCAN); 245 } catch (InterruptedException e) { 246 LOG.error("Sleep interrupted during scan operation", e); 247 } 248 return super.scan(controller, request); 249 } 250 251 @Override 252 public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request) 253 throws ServiceException { 254 try { 255 Thread.sleep(DELAY_BATCH_MUTATE); 256 } catch (InterruptedException e) { 257 LOG.error("Sleep interrupted during multi operation", e); 258 } 259 return super.multi(rpcc, request); 260 } 261 } 262}