001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.net.SocketTimeoutException;
026import java.util.Arrays;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
030import org.apache.hadoop.hbase.client.Connection;
031import org.apache.hadoop.hbase.client.ConnectionFactory;
032import org.apache.hadoop.hbase.client.Get;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.RegionLocator;
035import org.apache.hadoop.hbase.client.ResultScanner;
036import org.apache.hadoop.hbase.client.RetriesExhaustedException;
037import org.apache.hadoop.hbase.client.Scan;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
040import org.apache.hadoop.hbase.ipc.CallTimeoutException;
041import org.apache.hadoop.hbase.regionserver.HRegionServer;
042import org.apache.hadoop.hbase.regionserver.RSRpcServices;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.junit.AfterClass;
047import org.junit.Before;
048import org.junit.BeforeClass;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
056import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
057import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
058
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
060
061/**
062 * These tests verify that the RPC timeouts ('hbase.client.operation.timeout' and
063 * 'hbase.client.scanner.timeout.period') work correctly using a modified Region Server which
064 * injects delays to get, scan and mutate operations.
065 * <p/>
066 * When 'hbase.client.operation.timeout' is set and client operation is not completed in time the
067 * client will retry the operation 'hbase.client.retries.number' times. After that
068 * {@link SocketTimeoutException} will be thrown.
069 * <p/>
070 * Using 'hbase.client.scanner.timeout.period' configuration property similar behavior can be
071 * specified for scan related operations such as openScanner(), next(). If that times out
072 * {@link RetriesExhaustedException} will be thrown.
073 */
074@Category({ ClientTests.class, MediumTests.class })
075public class TestClientOperationTimeout {
076
077  private static final Logger LOG = LoggerFactory.getLogger(TestClientOperationTimeout.class);
078
079  @ClassRule
080  public static final HBaseClassTestRule CLASS_RULE =
081    HBaseClassTestRule.forClass(TestClientOperationTimeout.class);
082
083  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
084
085  // Activate the delays after table creation to test get/scan/put
086  private static int DELAY_GET;
087  private static int DELAY_SCAN;
088  private static int DELAY_MUTATE;
089  private static int DELAY_BATCH_MUTATE;
090
091  private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
092  private static final byte[] FAMILY = Bytes.toBytes("family");
093  private static final byte[] ROW = Bytes.toBytes("row");
094  private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
095  private static final byte[] VALUE = Bytes.toBytes("value");
096
097  private static Connection CONN;
098  private static Table TABLE;
099
100  @BeforeClass
101  public static void setUp() throws Exception {
102    // Set RegionServer class and use default values for other options.
103    StartTestingClusterOption option =
104      StartTestingClusterOption.builder().rsClass(DelayedRegionServer.class).build();
105    UTIL.startMiniCluster(option);
106    UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
107      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
108
109    Configuration conf = new Configuration(UTIL.getConfiguration());
110    conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
111    conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 500);
112    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500);
113    conf.setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
114    CONN = ConnectionFactory.createConnection(conf);
115    TABLE = CONN.getTable(TABLE_NAME);
116  }
117
118  @AfterClass
119  public static void tearDown() throws Exception {
120    Closeables.close(TABLE, true);
121    Closeables.close(CONN, true);
122    UTIL.shutdownMiniCluster();
123  }
124
125  @Before
126  public void setUpBeforeTest() throws Exception {
127    DELAY_GET = 0;
128    DELAY_SCAN = 0;
129    DELAY_MUTATE = 0;
130    DELAY_BATCH_MUTATE = 0;
131  }
132
133  /**
134   * Tests that a get on a table throws {@link RetriesExhaustedException} when the operation takes
135   * longer than 'hbase.client.operation.timeout'.
136   */
137  @Test
138  public void testGetTimeout() {
139    DELAY_GET = 600;
140    try {
141      TABLE.get(new Get(ROW));
142      fail("should not reach here");
143    } catch (Exception e) {
144      LOG.info("Got exception for get", e);
145      assertThat(e, instanceOf(RetriesExhaustedException.class));
146      assertThat(e.getCause(), instanceOf(CallTimeoutException.class));
147    }
148  }
149
150  /**
151   * Tests that a put on a table throws {@link RetriesExhaustedException} when the operation takes
152   * longer than 'hbase.client.operation.timeout'.
153   */
154  @Test
155  public void testPutTimeout() {
156    DELAY_MUTATE = 600;
157    Put put = new Put(ROW);
158    put.addColumn(FAMILY, QUALIFIER, VALUE);
159    try {
160      TABLE.put(put);
161      fail("should not reach here");
162    } catch (Exception e) {
163      LOG.info("Got exception for put", e);
164      assertThat(e, instanceOf(RetriesExhaustedException.class));
165      assertThat(e.getCause(), instanceOf(CallTimeoutException.class));
166    }
167  }
168
169  /**
170   * Tests that a batch mutate on a table throws {@link RetriesExhaustedException} when the
171   * operation takes longer than 'hbase.client.operation.timeout'.
172   */
173  @Test
174  public void testMultiPutsTimeout() {
175    DELAY_BATCH_MUTATE = 600;
176    Put put1 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
177    Put put2 = new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE);
178    List<Put> puts = Arrays.asList(put1, put2);
179    try {
180      TABLE.batch(puts, new Object[2]);
181      fail("should not reach here");
182    } catch (Exception e) {
183      LOG.info("Got exception for batch", e);
184      assertThat(e, instanceOf(RetriesExhaustedException.class));
185      assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
186      assertThat(e.getCause().getCause(), instanceOf(CallTimeoutException.class));
187    }
188  }
189
190  /**
191   * Tests that scan on a table throws {@link RetriesExhaustedException} when the operation takes
192   * longer than 'hbase.client.scanner.timeout.period'.
193   */
194  @Test
195  public void testScanTimeout() throws IOException, InterruptedException {
196    // cache the region location.
197    try (RegionLocator locator = TABLE.getRegionLocator()) {
198      locator.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
199    }
200    // sleep a bit to make sure the location has been cached as it is an async operation.
201    Thread.sleep(100);
202    DELAY_SCAN = 600;
203    try (ResultScanner scanner = TABLE.getScanner(new Scan())) {
204      scanner.next();
205      fail("should not reach here");
206    } catch (Exception e) {
207      LOG.info("Got exception for scan", e);
208      assertThat(e, instanceOf(RetriesExhaustedException.class));
209      assertThat(e.getCause(), instanceOf(CallTimeoutException.class));
210    }
211  }
212
213  public static final class DelayedRegionServer
214    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
215    public DelayedRegionServer(Configuration conf) throws IOException, InterruptedException {
216      super(conf);
217    }
218
219    @Override
220    protected RSRpcServices createRpcServices() throws IOException {
221      return new DelayedRSRpcServices(this);
222    }
223  }
224
225  /**
226   * This {@link RSRpcServices} class injects delay for Rpc calls and after executes super methods.
227   */
228  private static final class DelayedRSRpcServices extends RSRpcServices {
229    DelayedRSRpcServices(HRegionServer rs) throws IOException {
230      super(rs);
231    }
232
233    @Override
234    public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
235      throws ServiceException {
236      try {
237        Thread.sleep(DELAY_GET);
238      } catch (InterruptedException e) {
239        LOG.error("Sleep interrupted during get operation", e);
240      }
241      return super.get(controller, request);
242    }
243
244    @Override
245    public ClientProtos.MutateResponse mutate(RpcController rpcc,
246      ClientProtos.MutateRequest request) throws ServiceException {
247      try {
248        Thread.sleep(DELAY_MUTATE);
249      } catch (InterruptedException e) {
250        LOG.error("Sleep interrupted during mutate operation", e);
251      }
252      return super.mutate(rpcc, request);
253    }
254
255    @Override
256    public ClientProtos.ScanResponse scan(RpcController controller,
257      ClientProtos.ScanRequest request) throws ServiceException {
258      try {
259        Thread.sleep(DELAY_SCAN);
260      } catch (InterruptedException e) {
261        LOG.error("Sleep interrupted during scan operation", e);
262      }
263      return super.scan(controller, request);
264    }
265
266    @Override
267    public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
268      throws ServiceException {
269      try {
270        Thread.sleep(DELAY_BATCH_MUTATE);
271      } catch (InterruptedException e) {
272        LOG.error("Sleep interrupted during multi operation", e);
273      }
274      return super.multi(rpcc, request);
275    }
276  }
277}