001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.jupiter.api.Assertions.fail;
023
024import java.io.IOException;
025import java.net.SocketTimeoutException;
026import java.util.Arrays;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.RegionLocator;
035import org.apache.hadoop.hbase.client.ResultScanner;
036import org.apache.hadoop.hbase.client.RetriesExhaustedException;
037import org.apache.hadoop.hbase.client.Scan;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.ipc.CallTimeoutException;
041import org.apache.hadoop.hbase.regionserver.HRegionServer;
042import org.apache.hadoop.hbase.regionserver.RSRpcServices;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.junit.jupiter.api.AfterAll;
047import org.junit.jupiter.api.BeforeAll;
048import org.junit.jupiter.api.BeforeEach;
049import org.junit.jupiter.api.Tag;
050import org.junit.jupiter.api.Test;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
055import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
056import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
057
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
059
060/**
061 * These tests verify that the RPC timeouts ('hbase.client.operation.timeout' and
062 * 'hbase.client.scanner.timeout.period') work correctly using a modified Region Server which
063 * injects delays to get, scan and mutate operations.
064 * <p/>
065 * When 'hbase.client.operation.timeout' is set and client operation is not completed in time the
066 * client will retry the operation 'hbase.client.retries.number' times. After that
067 * {@link SocketTimeoutException} will be thrown.
068 * <p/>
069 * Using 'hbase.client.scanner.timeout.period' configuration property similar behavior can be
070 * specified for scan related operations such as openScanner(), next(). If that times out
071 * {@link RetriesExhaustedException} will be thrown.
072 */
073@Tag(ClientTests.TAG)
074@Tag(MediumTests.TAG)
075public class TestClientOperationTimeout {
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestClientOperationTimeout.class);
078
079  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
080
081  // Activate the delays after table creation to test get/scan/put
082  private static int DELAY_GET;
083  private static int DELAY_SCAN;
084  private static int DELAY_MUTATE;
085  private static int DELAY_BATCH_MUTATE;
086
087  private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
088  private static final byte[] FAMILY = Bytes.toBytes("family");
089  private static final byte[] ROW = Bytes.toBytes("row");
090  private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
091  private static final byte[] VALUE = Bytes.toBytes("value");
092
093  private static Connection CONN;
094  private static Table TABLE;
095
096  @BeforeAll
097  public static void setUp() throws Exception {
098    // Set RegionServer class and use default values for other options.
099    StartTestingClusterOption option =
100      StartTestingClusterOption.builder().rsClass(DelayedRegionServer.class).build();
101    UTIL.startMiniCluster(option);
102    UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
103      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
104
105    Configuration conf = new Configuration(UTIL.getConfiguration());
106    conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
107    conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 500);
108    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500);
109    conf.setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
110    CONN = ConnectionFactory.createConnection(conf);
111    TABLE = CONN.getTable(TABLE_NAME);
112  }
113
114  @AfterAll
115  public static void tearDown() throws Exception {
116    Closeables.close(TABLE, true);
117    Closeables.close(CONN, true);
118    UTIL.shutdownMiniCluster();
119  }
120
121  @BeforeEach
122  public void setUpBeforeTest() throws Exception {
123    DELAY_GET = 0;
124    DELAY_SCAN = 0;
125    DELAY_MUTATE = 0;
126    DELAY_BATCH_MUTATE = 0;
127  }
128
129  /**
130   * Tests that a get on a table throws {@link RetriesExhaustedException} when the operation takes
131   * longer than 'hbase.client.operation.timeout'.
132   */
133  @Test
134  public void testGetTimeout() {
135    DELAY_GET = 600;
136    try {
137      TABLE.get(new Get(ROW));
138      fail("should not reach here");
139    } catch (Exception e) {
140      LOG.info("Got exception for get", e);
141      assertThat(e, instanceOf(RetriesExhaustedException.class));
142      assertThat(e.getCause(), instanceOf(CallTimeoutException.class));
143    }
144  }
145
146  /**
147   * Tests that a put on a table throws {@link RetriesExhaustedException} when the operation takes
148   * longer than 'hbase.client.operation.timeout'.
149   */
150  @Test
151  public void testPutTimeout() {
152    DELAY_MUTATE = 600;
153    Put put = new Put(ROW);
154    put.addColumn(FAMILY, QUALIFIER, VALUE);
155    try {
156      TABLE.put(put);
157      fail("should not reach here");
158    } catch (Exception e) {
159      LOG.info("Got exception for put", e);
160      assertThat(e, instanceOf(RetriesExhaustedException.class));
161      assertThat(e.getCause(), instanceOf(CallTimeoutException.class));
162    }
163  }
164
165  /**
166   * Tests that a batch mutate on a table throws {@link RetriesExhaustedException} when the
167   * operation takes longer than 'hbase.client.operation.timeout'.
168   */
169  @Test
170  public void testMultiPutsTimeout() {
171    DELAY_BATCH_MUTATE = 600;
172    Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
173    Put put2 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
174    List<Put> puts = Arrays.asList(put1, put2);
175    try {
176      TABLE.batch(puts, new Object[2]);
177      fail("should not reach here");
178    } catch (Exception e) {
179      LOG.info("Got exception for batch", e);
180      assertThat(e, instanceOf(RetriesExhaustedException.class));
181      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
182      assertThat(e.getCause().getCause(), instanceOf(CallTimeoutException.class));
183    }
184  }
185
186  /**
187   * Tests that scan on a table throws {@link RetriesExhaustedException} when the operation takes
188   * longer than 'hbase.client.scanner.timeout.period'.
189   */
190  @Test
191  public void testScanTimeout() throws IOException, InterruptedException {
192    // cache the region location.
193    try (RegionLocator locator = TABLE.getRegionLocator()) {
194      locator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
195    }
196    // sleep a bit to make sure the location has been cached as it is an async operation.
197    Thread.sleep(100);
198    DELAY_SCAN = 600;
199    try (ResultScanner scanner = TABLE.getScanner(new Scan())) {
200      scanner.next();
201      fail("should not reach here");
202    } catch (Exception e) {
203      LOG.info("Got exception for scan", e);
204      assertThat(e, instanceOf(RetriesExhaustedException.class));
205      assertThat(e.getCause(), instanceOf(CallTimeoutException.class));
206    }
207  }
208
209  public static final class DelayedRegionServer
210    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
211    public DelayedRegionServer(Configuration conf) throws IOException, InterruptedException {
212      super(conf);
213    }
214
215    @Override
216    protected RSRpcServices createRpcServices() throws IOException {
217      return new DelayedRSRpcServices(this);
218    }
219  }
220
221  /**
222   * This {@link RSRpcServices} class injects delay for Rpc calls and after executes super methods.
223   */
224  private static final class DelayedRSRpcServices extends RSRpcServices {
225    DelayedRSRpcServices(HRegionServer rs) throws IOException {
226      super(rs);
227    }
228
229    @Override
230    public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
231      throws ServiceException {
232      try {
233        Thread.sleep(DELAY_GET);
234      } catch (InterruptedException e) {
235        LOG.error("Sleep interrupted during get operation", e);
236      }
237      return super.get(controller, request);
238    }
239
240    @Override
241    public ClientProtos.MutateResponse mutate(RpcController rpcc,
242      ClientProtos.MutateRequest request) throws ServiceException {
243      try {
244        Thread.sleep(DELAY_MUTATE);
245      } catch (InterruptedException e) {
246        LOG.error("Sleep interrupted during mutate operation", e);
247      }
248      return super.mutate(rpcc, request);
249    }
250
251    @Override
252    public ClientProtos.ScanResponse scan(RpcController controller,
253      ClientProtos.ScanRequest request) throws ServiceException {
254      try {
255        Thread.sleep(DELAY_SCAN);
256      } catch (InterruptedException e) {
257        LOG.error("Sleep interrupted during scan operation", e);
258      }
259      return super.scan(controller, request);
260    }
261
262    @Override
263    public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
264      throws ServiceException {
265      try {
266        Thread.sleep(DELAY_BATCH_MUTATE);
267      } catch (InterruptedException e) {
268        LOG.error("Sleep interrupted during multi operation", e);
269      }
270      return super.multi(rpcc, request);
271    }
272  }
273}