001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.net.SocketTimeoutException;
022import java.util.ArrayList;
023import java.util.List;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.client.ConnectionFactory;
028import org.apache.hadoop.hbase.client.Get;
029import org.apache.hadoop.hbase.client.Put;
030import org.apache.hadoop.hbase.client.ResultScanner;
031import org.apache.hadoop.hbase.client.RetriesExhaustedException;
032import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.client.Table;
035import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
036import org.apache.hadoop.hbase.ipc.CallTimeoutException;
037import org.apache.hadoop.hbase.regionserver.HRegionServer;
038import org.apache.hadoop.hbase.regionserver.RSRpcServices;
039import org.apache.hadoop.hbase.testclassification.ClientTests;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.junit.AfterClass;
043import org.junit.Assert;
044import org.junit.Before;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
051import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
052import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
053
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
055
056/**
057 * These tests verify that the RPC timeouts ('hbase.client.operation.timeout' and
058 * 'hbase.client.scanner.timeout.period') work correctly using a modified Region Server which
059 * injects delays to get, scan and mutate operations.
060 * <p/>
061 * When 'hbase.client.operation.timeout' is set and client operation is not completed in time the
062 * client will retry the operation 'hbase.client.retries.number' times. After that
063 * {@link SocketTimeoutException} will be thrown.
064 * <p/>
065 * Using 'hbase.client.scanner.timeout.period' configuration property similar behavior can be
066 * specified for scan related operations such as openScanner(), next(). If that times out
067 * {@link RetriesExhaustedException} will be thrown.
068 */
069@Category({ ClientTests.class, MediumTests.class })
070public class TestClientOperationTimeout {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074    HBaseClassTestRule.forClass(TestClientOperationTimeout.class);
075
076  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
077
078  // Activate the delays after table creation to test get/scan/put
079  private static int DELAY_GET;
080  private static int DELAY_SCAN;
081  private static int DELAY_MUTATE;
082  private static int DELAY_BATCH_MUTATE;
083
084  private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
085  private static final byte[] FAMILY = Bytes.toBytes("family");
086  private static final byte[] ROW = Bytes.toBytes("row");
087  private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
088  private static final byte[] VALUE = Bytes.toBytes("value");
089
090  private static Connection CONN;
091  private static Table TABLE;
092
093  @BeforeClass
094  public static void setUpClass() throws Exception {
095    // Set RegionServer class and use default values for other options.
096    StartMiniClusterOption option =
097      StartMiniClusterOption.builder().rsClass(DelayedRegionServer.class).build();
098    UTIL.startMiniCluster(option);
099    UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
100      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
101
102    Configuration conf = new Configuration(UTIL.getConfiguration());
103    conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
104    conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 500);
105    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500);
106    conf.setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
107    CONN = ConnectionFactory.createConnection(conf);
108    TABLE = CONN.getTable(TABLE_NAME);
109  }
110
111  @Before
112  public void setUp() throws Exception {
113    DELAY_GET = 0;
114    DELAY_SCAN = 0;
115    DELAY_MUTATE = 0;
116    DELAY_BATCH_MUTATE = 0;
117  }
118
119  @AfterClass
120  public static void tearDown() throws Exception {
121    Closeables.close(TABLE, true);
122    Closeables.close(CONN, true);
123    UTIL.shutdownMiniCluster();
124  }
125
126  /**
127   * Tests that a get on a table throws {@link RetriesExhaustedException} when the operation takes
128   * longer than 'hbase.client.operation.timeout'.
129   */
130  @Test
131  public void testGetTimeout() {
132    DELAY_GET = 600;
133    try {
134      TABLE.get(new Get(ROW));
135      Assert.fail("should not reach here");
136    } catch (Exception e) {
137      Assert.assertTrue(
138        e instanceof SocketTimeoutException && e.getCause() instanceof CallTimeoutException);
139    }
140  }
141
142  /**
143   * Tests that a put on a table throws {@link RetriesExhaustedException} when the operation takes
144   * longer than 'hbase.client.operation.timeout'.
145   */
146  @Test
147  public void testPutTimeout() {
148    DELAY_MUTATE = 600;
149    Put put = new Put(ROW);
150    put.addColumn(FAMILY, QUALIFIER, VALUE);
151    try {
152      TABLE.put(put);
153      Assert.fail("should not reach here");
154    } catch (Exception e) {
155      Assert.assertTrue(
156        e instanceof SocketTimeoutException && e.getCause() instanceof CallTimeoutException);
157    }
158  }
159
160  /**
161   * Tests that a batch mutate on a table throws {@link RetriesExhaustedException} when the
162   * operation takes longer than 'hbase.client.operation.timeout'.
163   */
164  @Test
165  public void testMultiPutsTimeout() {
166    DELAY_BATCH_MUTATE = 600;
167    Put put1 = new Put(ROW);
168    put1.addColumn(FAMILY, QUALIFIER, VALUE);
169    Put put2 = new Put(ROW);
170    put2.addColumn(FAMILY, QUALIFIER, VALUE);
171    List<Put> puts = new ArrayList<>();
172    puts.add(put1);
173    puts.add(put2);
174    try {
175      TABLE.batch(puts, new Object[2]);
176      Assert.fail("should not reach here");
177    } catch (Exception e) {
178      Assert.assertTrue(e instanceof RetriesExhaustedWithDetailsException);
179    }
180  }
181
182  /**
183   * Tests that scan on a table throws {@link RetriesExhaustedException} when the operation takes
184   * longer than 'hbase.client.scanner.timeout.period'.
185   */
186  @Test
187  public void testScanTimeout() {
188    DELAY_SCAN = 600;
189    try {
190      ResultScanner scanner = TABLE.getScanner(new Scan());
191      scanner.next();
192      Assert.fail("should not reach here");
193    } catch (Exception e) {
194      Assert.assertTrue(
195        e instanceof RetriesExhaustedException && e.getCause() instanceof SocketTimeoutException);
196    }
197  }
198
199  private static class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
200    public DelayedRegionServer(Configuration conf) throws IOException, InterruptedException {
201      super(conf);
202    }
203
204    @Override
205    protected RSRpcServices createRpcServices() throws IOException {
206      return new DelayedRSRpcServices(this);
207    }
208  }
209
210  /**
211   * This {@link RSRpcServices} class injects delay for Rpc calls and after executes super methods.
212   */
213  public static class DelayedRSRpcServices extends RSRpcServices {
214    DelayedRSRpcServices(HRegionServer rs) throws IOException {
215      super(rs);
216    }
217
218    @Override
219    public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
220        throws ServiceException {
221      try {
222        Thread.sleep(DELAY_GET);
223      } catch (InterruptedException e) {
224        LOG.error("Sleep interrupted during get operation", e);
225      }
226      return super.get(controller, request);
227    }
228
229    @Override
230    public ClientProtos.MutateResponse mutate(RpcController rpcc,
231        ClientProtos.MutateRequest request) throws ServiceException {
232      try {
233        Thread.sleep(DELAY_MUTATE);
234      } catch (InterruptedException e) {
235        LOG.error("Sleep interrupted during mutate operation", e);
236      }
237      return super.mutate(rpcc, request);
238    }
239
240    @Override
241    public ClientProtos.ScanResponse scan(RpcController controller,
242        ClientProtos.ScanRequest request) throws ServiceException {
243      try {
244        Thread.sleep(DELAY_SCAN);
245      } catch (InterruptedException e) {
246        LOG.error("Sleep interrupted during scan operation", e);
247      }
248      return super.scan(controller, request);
249    }
250
251    @Override
252    public ClientProtos.MultiResponse multi(RpcController rpcc, ClientProtos.MultiRequest request)
253        throws ServiceException {
254      try {
255        Thread.sleep(DELAY_BATCH_MUTATE);
256      } catch (InterruptedException e) {
257        LOG.error("Sleep interrupted during multi operation", e);
258      }
259      return super.multi(rpcc, request);
260    }
261  }
262}