001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.net.SocketTimeoutException;
022import java.util.ArrayList;
023import java.util.List;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.client.ConnectionFactory;
028import org.apache.hadoop.hbase.client.Get;
029import org.apache.hadoop.hbase.client.Put;
030import org.apache.hadoop.hbase.client.ResultScanner;
031import org.apache.hadoop.hbase.client.RetriesExhaustedException;
032import org.apache.hadoop.hbase.client.Scan;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
035import org.apache.hadoop.hbase.ipc.CallTimeoutException;
036import org.apache.hadoop.hbase.regionserver.HRegionServer;
037import org.apache.hadoop.hbase.regionserver.RSRpcServices;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.AfterClass;
042import org.junit.Assert;
043import org.junit.Before;
044import org.junit.BeforeClass;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048
049import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
050import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
051import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
052
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
054
055/**
056 * These tests verify that the RPC timeouts ('hbase.client.operation.timeout' and
057 * 'hbase.client.scanner.timeout.period') work correctly using a modified Region Server which
058 * injects delays to get, scan and mutate operations.
059 * <p/>
060 * When 'hbase.client.operation.timeout' is set and client operation is not completed in time the
061 * client will retry the operation 'hbase.client.retries.number' times. After that
062 * {@link SocketTimeoutException} will be thrown.
063 * <p/>
064 * Using 'hbase.client.scanner.timeout.period' configuration property similar behavior can be
065 * specified for scan related operations such as openScanner(), next(). If that times out
066 * {@link RetriesExhaustedException} will be thrown.
067 */
068@Category({ ClientTests.class, MediumTests.class })
069public class TestClientOperationTimeout {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestClientOperationTimeout.class);
074
075  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
076
077  // Activate the delays after table creation to test get/scan/put
078  private static int DELAY_GET;
079  private static int DELAY_SCAN;
080  private static int DELAY_MUTATE;
081  private static int DELAY_BATCH_MUTATE;
082
083  private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
084  private static final byte[] FAMILY = Bytes.toBytes("family");
085  private static final byte[] ROW = Bytes.toBytes("row");
086  private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
087  private static final byte[] VALUE = Bytes.toBytes("value");
088
089  private static Connection CONN;
090  private static Table TABLE;
091
092  @BeforeClass
093  public static void setUpClass() throws Exception {
094    // Set RegionServer class and use default values for other options.
095    StartMiniClusterOption option =
096      StartMiniClusterOption.builder().rsClass(DelayedRegionServer.class).build();
097    UTIL.startMiniCluster(option);
098    UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
099      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
100
101    Configuration conf = new Configuration(UTIL.getConfiguration());
102    conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
103    conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 500);
104    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500);
105    conf.setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
106    CONN = ConnectionFactory.createConnection(conf);
107    TABLE = CONN.getTable(TABLE_NAME);
108  }
109
110  @Before
111  public void setUp() throws Exception {
112    DELAY_GET = 0;
113    DELAY_SCAN = 0;
114    DELAY_MUTATE = 0;
115    DELAY_BATCH_MUTATE = 0;
116  }
117
118  @AfterClass
119  public static void tearDown() throws Exception {
120    Closeables.close(TABLE, true);
121    Closeables.close(CONN, true);
122    UTIL.shutdownMiniCluster();
123  }
124
125  /**
126   * Tests that a get on a table throws {@link RetriesExhaustedException} when the operation takes
127   * longer than 'hbase.client.operation.timeout'.
128   */
129  @Test
130  public void testGetTimeout() {
131    DELAY_GET = 600;
132    try {
133      TABLE.get(new Get(ROW));
134      Assert.fail("should not reach here");
135    } catch (Exception e) {
136      Assert.assertTrue(
137        e instanceof SocketTimeoutException && e.getCause() instanceof CallTimeoutException);
138    }
139  }
140
141  /**
142   * Tests that a put on a table throws {@link RetriesExhaustedException} when the operation takes
143   * longer than 'hbase.client.operation.timeout'.
144   */
145  @Test
146  public void testPutTimeout() {
147    DELAY_MUTATE = 600;
148    Put put = new Put(ROW);
149    put.addColumn(FAMILY, QUALIFIER, VALUE);
150    try {
151      TABLE.put(put);
152      Assert.fail("should not reach here");
153    } catch (Exception e) {
154      Assert.assertTrue(
155        e instanceof SocketTimeoutException && e.getCause() instanceof CallTimeoutException);
156    }
157  }
158
159  /**
160   * Tests that a batch mutate on a table throws {@link SocketTimeoutException} when the operation
161   * takes longer than 'hbase.client.operation.timeout'.
162   */
163  @Test
164  public void testMultiPutsTimeout() {
165    DELAY_BATCH_MUTATE = 600;
166    Put put1 = new Put(ROW);
167    put1.addColumn(FAMILY, QUALIFIER, VALUE);
168    Put put2 = new Put(ROW);
169    put2.addColumn(FAMILY, QUALIFIER, VALUE);
170    List<Put> puts = new ArrayList<>();
171    puts.add(put1);
172    puts.add(put2);
173    try {
174      TABLE.batch(puts, new Object[2]);
175      Assert.fail("should not reach here");
176    } catch (Exception e) {
177      Assert.assertTrue(e instanceof SocketTimeoutException);
178    }
179  }
180
181  /**
182   * Tests that scan on a table throws {@link RetriesExhaustedException} when the operation takes
183   * longer than 'hbase.client.scanner.timeout.period'.
184   */
185  @Test
186  public void testScanTimeout() {
187    DELAY_SCAN = 600;
188    try {
189      ResultScanner scanner = TABLE.getScanner(new Scan());
190      scanner.next();
191      Assert.fail("should not reach here");
192    } catch (Exception e) {
193      Assert.assertTrue(
194        e instanceof RetriesExhaustedException && e.getCause() instanceof SocketTimeoutException);
195    }
196  }
197
198  private static class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
199    public DelayedRegionServer(Configuration conf) throws IOException, InterruptedException {
200      super(conf);
201    }
202
203    @Override
204    protected RSRpcServices createRpcServices() throws IOException {
205      return new DelayedRSRpcServices(this);
206    }
207  }
208
209  /**
210   * This {@link RSRpcServices} class injects delay for Rpc calls and after executes super methods.
211   */
212  public static class DelayedRSRpcServices extends RSRpcServices {
213    DelayedRSRpcServices(HRegionServer rs) throws IOException {
214      super(rs);
215    }
216
217    @Override
218    public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
219      throws ServiceException {
220      try {
221        Thread.sleep(DELAY_GET);
222      } catch (InterruptedException e) {
223        LOG.error("Sleep interrupted during get operation", e);
224      }
225      return super.get(controller, request);
226    }
227
228    @Override
229    public ClientProtos.MutateResponse mutate(RpcController rpcc,
230      ClientProtos.MutateRequest request) throws ServiceException {
231      try {
232        Thread.sleep(DELAY_MUTATE);
233      } catch (InterruptedException e) {
234        LOG.error("Sleep interrupted during mutate operation", e);
235      }
236      return super.mutate(rpcc, request);
237    }
238
239    @Override
240    public ClientProtos.ScanResponse scan(RpcController controller,
241      ClientProtos.ScanRequest request) throws ServiceException {
242      try {
243        Thread.sleep(DELAY_SCAN);
244      } catch (InterruptedException e) {
245        LOG.error("Sleep interrupted during scan operation", e);
246      }
247      return super.scan(controller, request);
248    }
249
250    @Override
251    public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
252      throws ServiceException {
253      try {
254        Thread.sleep(DELAY_BATCH_MUTATE);
255      } catch (InterruptedException e) {
256        LOG.error("Sleep interrupted during multi operation", e);
257      }
258      return super.multi(rpcc, request);
259    }
260  }
261}