001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.net.SocketTimeoutException;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
024import org.apache.hadoop.hbase.client.Connection;
025import org.apache.hadoop.hbase.client.ConnectionFactory;
026import org.apache.hadoop.hbase.client.Get;
027import org.apache.hadoop.hbase.client.Put;
028import org.apache.hadoop.hbase.client.ResultScanner;
029import org.apache.hadoop.hbase.client.RetriesExhaustedException;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.RSRpcServices;
035import org.apache.hadoop.hbase.testclassification.ClientTests;
036import org.apache.hadoop.hbase.testclassification.MediumTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.junit.AfterClass;
039import org.junit.Before;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044
045import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
046import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
047import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
048
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
050
051/**
052 * These tests verify that the RPC timeouts ('hbase.client.operation.timeout' and
053 * 'hbase.client.scanner.timeout.period') work correctly using a modified Region Server which
054 * injects delays to get, scan and mutate operations.
055 * <p/>
056 * When 'hbase.client.operation.timeout' is set and client operation is not completed in time the
057 * client will retry the operation 'hbase.client.retries.number' times. After that
058 * {@link SocketTimeoutException} will be thrown.
059 * <p/>
060 * Using 'hbase.client.scanner.timeout.period' configuration property similar behavior can be
061 * specified for scan related operations such as openScanner(), next(). If that times out
062 * {@link RetriesExhaustedException} will be thrown.
063 */
064@Category({ ClientTests.class, MediumTests.class })
065public class TestClientOperationTimeout {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestClientOperationTimeout.class);
070
071  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
072
073  // Activate the delays after table creation to test get/scan/put
074  private static int DELAY_GET;
075  private static int DELAY_SCAN;
076  private static int DELAY_MUTATE;
077
078  private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
079  private static final byte[] FAMILY = Bytes.toBytes("family");
080  private static final byte[] ROW = Bytes.toBytes("row");
081  private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
082  private static final byte[] VALUE = Bytes.toBytes("value");
083
084  private static Connection CONN;
085  private static Table TABLE;
086
087  @BeforeClass
088  public static void setUpClass() throws Exception {
089    // Set RegionServer class and use default values for other options.
090    StartMiniClusterOption option =
091      StartMiniClusterOption.builder().rsClass(DelayedRegionServer.class).build();
092    UTIL.startMiniCluster(option);
093    UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
094      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
095
096    Configuration conf = new Configuration(UTIL.getConfiguration());
097    conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
098    conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 500);
099    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500);
100    conf.setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
101    CONN = ConnectionFactory.createConnection(conf);
102    TABLE = CONN.getTable(TABLE_NAME);
103  }
104
105  @Before
106  public void setUp() throws Exception {
107    DELAY_GET = 0;
108    DELAY_SCAN = 0;
109    DELAY_MUTATE = 0;
110  }
111
112  @AfterClass
113  public static void tearDown() throws Exception {
114    Closeables.close(TABLE, true);
115    Closeables.close(CONN, true);
116    UTIL.shutdownMiniCluster();
117  }
118
119  /**
120   * Tests that a get on a table throws {@link SocketTimeoutException} when the operation takes
121   * longer than 'hbase.client.operation.timeout'.
122   */
123  @Test(expected = SocketTimeoutException.class)
124  public void testGetTimeout() throws Exception {
125    DELAY_GET = 600;
126    TABLE.get(new Get(ROW));
127  }
128
129  /**
130   * Tests that a put on a table throws {@link SocketTimeoutException} when the operation takes
131   * longer than 'hbase.client.operation.timeout'.
132   */
133  @Test(expected = SocketTimeoutException.class)
134  public void testPutTimeout() throws Exception {
135    DELAY_MUTATE = 600;
136
137    Put put = new Put(ROW);
138    put.addColumn(FAMILY, QUALIFIER, VALUE);
139    TABLE.put(put);
140  }
141
142  /**
143   * Tests that scan on a table throws {@link RetriesExhaustedException} when the operation takes
144   * longer than 'hbase.client.scanner.timeout.period'.
145   */
146  @Test(expected = RetriesExhaustedException.class)
147  public void testScanTimeout() throws Exception {
148    DELAY_SCAN = 600;
149    ResultScanner scanner = TABLE.getScanner(new Scan());
150    scanner.next();
151  }
152
153  private static class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
154    public DelayedRegionServer(Configuration conf) throws IOException, InterruptedException {
155      super(conf);
156    }
157
158    @Override
159    protected RSRpcServices createRpcServices() throws IOException {
160      return new DelayedRSRpcServices(this);
161    }
162  }
163
164  /**
165   * This {@link RSRpcServices} class injects delay for Rpc calls and after executes super methods.
166   */
167  public static class DelayedRSRpcServices extends RSRpcServices {
168    DelayedRSRpcServices(HRegionServer rs) throws IOException {
169      super(rs);
170    }
171
172    @Override
173    public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
174        throws ServiceException {
175      try {
176        Thread.sleep(DELAY_GET);
177      } catch (InterruptedException e) {
178        LOG.error("Sleep interrupted during get operation", e);
179      }
180      return super.get(controller, request);
181    }
182
183    @Override
184    public ClientProtos.MutateResponse mutate(RpcController rpcc,
185        ClientProtos.MutateRequest request) throws ServiceException {
186      try {
187        Thread.sleep(DELAY_MUTATE);
188      } catch (InterruptedException e) {
189        LOG.error("Sleep interrupted during mutate operation", e);
190      }
191      return super.mutate(rpcc, request);
192    }
193
194    @Override
195    public ClientProtos.ScanResponse scan(RpcController controller,
196        ClientProtos.ScanRequest request) throws ServiceException {
197      try {
198        Thread.sleep(DELAY_SCAN);
199      } catch (InterruptedException e) {
200        LOG.error("Sleep interrupted during scan operation", e);
201      }
202      return super.scan(controller, request);
203    }
204  }
205}