001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.net.SocketTimeoutException; 022import java.util.ArrayList; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 026import org.apache.hadoop.hbase.client.Connection; 027import org.apache.hadoop.hbase.client.ConnectionFactory; 028import org.apache.hadoop.hbase.client.Get; 029import org.apache.hadoop.hbase.client.Put; 030import org.apache.hadoop.hbase.client.ResultScanner; 031import org.apache.hadoop.hbase.client.RetriesExhaustedException; 032import org.apache.hadoop.hbase.client.Scan; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 035import org.apache.hadoop.hbase.ipc.CallTimeoutException; 036import org.apache.hadoop.hbase.regionserver.HRegionServer; 037import org.apache.hadoop.hbase.regionserver.RSRpcServices; 038import org.apache.hadoop.hbase.testclassification.ClientTests; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.junit.AfterClass; 042import org.junit.Assert; 043import org.junit.Before; 044import org.junit.BeforeClass; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048 049import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 050import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 051import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 054 055/** 056 * These tests verify that the RPC timeouts ('hbase.client.operation.timeout' and 057 * 'hbase.client.scanner.timeout.period') work correctly using a modified Region Server which 058 * injects delays to get, scan and mutate operations. 059 * <p/> 060 * When 'hbase.client.operation.timeout' is set and client operation is not completed in time the 061 * client will retry the operation 'hbase.client.retries.number' times. After that 062 * {@link SocketTimeoutException} will be thrown. 063 * <p/> 064 * Using 'hbase.client.scanner.timeout.period' configuration property similar behavior can be 065 * specified for scan related operations such as openScanner(), next(). If that times out 066 * {@link RetriesExhaustedException} will be thrown. 067 */ 068@Category({ ClientTests.class, MediumTests.class }) 069public class TestClientOperationTimeout { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestClientOperationTimeout.class); 074 075 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 076 077 // Activate the delays after table creation to test get/scan/put 078 private static int DELAY_GET; 079 private static int DELAY_SCAN; 080 private static int DELAY_MUTATE; 081 private static int DELAY_BATCH_MUTATE; 082 083 private static final TableName TABLE_NAME = TableName.valueOf("Timeout"); 084 private static final byte[] FAMILY = Bytes.toBytes("family"); 085 private static final byte[] ROW = Bytes.toBytes("row"); 086 private static final byte[] QUALIFIER = Bytes.toBytes("qualifier"); 087 private static final byte[] VALUE = Bytes.toBytes("value"); 088 089 private static Connection CONN; 090 private static Table TABLE; 091 092 @BeforeClass 093 public static void setUpClass() throws Exception { 094 // Set RegionServer class and use default values for other options. 095 StartMiniClusterOption option = 096 StartMiniClusterOption.builder().rsClass(DelayedRegionServer.class).build(); 097 UTIL.startMiniCluster(option); 098 UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) 099 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build()); 100 101 Configuration conf = new Configuration(UTIL.getConfiguration()); 102 conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500); 103 conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 500); 104 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500); 105 conf.setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 106 CONN = ConnectionFactory.createConnection(conf); 107 TABLE = CONN.getTable(TABLE_NAME); 108 } 109 110 @Before 111 public void setUp() throws Exception { 112 DELAY_GET = 0; 113 DELAY_SCAN = 0; 114 DELAY_MUTATE = 0; 115 DELAY_BATCH_MUTATE = 0; 116 } 117 118 @AfterClass 119 public static void tearDown() throws Exception { 120 Closeables.close(TABLE, true); 121 Closeables.close(CONN, true); 122 UTIL.shutdownMiniCluster(); 123 } 124 125 /** 126 * Tests that a get on a table throws {@link RetriesExhaustedException} when the operation takes 127 * longer than 'hbase.client.operation.timeout'. 128 */ 129 @Test 130 public void testGetTimeout() { 131 DELAY_GET = 600; 132 try { 133 TABLE.get(new Get(ROW)); 134 Assert.fail("should not reach here"); 135 } catch (Exception e) { 136 Assert.assertTrue( 137 e instanceof SocketTimeoutException && e.getCause() instanceof CallTimeoutException); 138 } 139 } 140 141 /** 142 * Tests that a put on a table throws {@link RetriesExhaustedException} when the operation takes 143 * longer than 'hbase.client.operation.timeout'. 144 */ 145 @Test 146 public void testPutTimeout() { 147 DELAY_MUTATE = 600; 148 Put put = new Put(ROW); 149 put.addColumn(FAMILY, QUALIFIER, VALUE); 150 try { 151 TABLE.put(put); 152 Assert.fail("should not reach here"); 153 } catch (Exception e) { 154 Assert.assertTrue( 155 e instanceof SocketTimeoutException && e.getCause() instanceof CallTimeoutException); 156 } 157 } 158 159 /** 160 * Tests that a batch mutate on a table throws {@link SocketTimeoutException} when the operation 161 * takes longer than 'hbase.client.operation.timeout'. 162 */ 163 @Test 164 public void testMultiPutsTimeout() { 165 DELAY_BATCH_MUTATE = 600; 166 Put put1 = new Put(ROW); 167 put1.addColumn(FAMILY, QUALIFIER, VALUE); 168 Put put2 = new Put(ROW); 169 put2.addColumn(FAMILY, QUALIFIER, VALUE); 170 List<Put> puts = new ArrayList<>(); 171 puts.add(put1); 172 puts.add(put2); 173 try { 174 TABLE.batch(puts, new Object[2]); 175 Assert.fail("should not reach here"); 176 } catch (Exception e) { 177 Assert.assertTrue(e instanceof SocketTimeoutException); 178 } 179 } 180 181 /** 182 * Tests that scan on a table throws {@link RetriesExhaustedException} when the operation takes 183 * longer than 'hbase.client.scanner.timeout.period'. 184 */ 185 @Test 186 public void testScanTimeout() { 187 DELAY_SCAN = 600; 188 try { 189 ResultScanner scanner = TABLE.getScanner(new Scan()); 190 scanner.next(); 191 Assert.fail("should not reach here"); 192 } catch (Exception e) { 193 Assert.assertTrue( 194 e instanceof RetriesExhaustedException && e.getCause() instanceof SocketTimeoutException); 195 } 196 } 197 198 private static class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer { 199 public DelayedRegionServer(Configuration conf) throws IOException, InterruptedException { 200 super(conf); 201 } 202 203 @Override 204 protected RSRpcServices createRpcServices() throws IOException { 205 return new DelayedRSRpcServices(this); 206 } 207 } 208 209 /** 210 * This {@link RSRpcServices} class injects delay for Rpc calls and after executes super methods. 211 */ 212 public static class DelayedRSRpcServices extends RSRpcServices { 213 DelayedRSRpcServices(HRegionServer rs) throws IOException { 214 super(rs); 215 } 216 217 @Override 218 public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request) 219 throws ServiceException { 220 try { 221 Thread.sleep(DELAY_GET); 222 } catch (InterruptedException e) { 223 LOG.error("Sleep interrupted during get operation", e); 224 } 225 return super.get(controller, request); 226 } 227 228 @Override 229 public ClientProtos.MutateResponse mutate(RpcController rpcc, 230 ClientProtos.MutateRequest request) throws ServiceException { 231 try { 232 Thread.sleep(DELAY_MUTATE); 233 } catch (InterruptedException e) { 234 LOG.error("Sleep interrupted during mutate operation", e); 235 } 236 return super.mutate(rpcc, request); 237 } 238 239 @Override 240 public ClientProtos.ScanResponse scan(RpcController controller, 241 ClientProtos.ScanRequest request) throws ServiceException { 242 try { 243 Thread.sleep(DELAY_SCAN); 244 } catch (InterruptedException e) { 245 LOG.error("Sleep interrupted during scan operation", e); 246 } 247 return super.scan(controller, request); 248 } 249 250 @Override 251 public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request) 252 throws ServiceException { 253 try { 254 Thread.sleep(DELAY_BATCH_MUTATE); 255 } catch (InterruptedException e) { 256 LOG.error("Sleep interrupted during multi operation", e); 257 } 258 return super.multi(rpcc, request); 259 } 260 } 261}