001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.net.SocketTimeoutException;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
024import org.apache.hadoop.hbase.client.Connection;
025import org.apache.hadoop.hbase.client.ConnectionFactory;
026import org.apache.hadoop.hbase.client.Get;
027import org.apache.hadoop.hbase.client.Put;
028import org.apache.hadoop.hbase.client.ResultScanner;
029import org.apache.hadoop.hbase.client.RetriesExhaustedException;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.regionserver.RSRpcServices;
035import org.apache.hadoop.hbase.testclassification.ClientTests;
036import org.apache.hadoop.hbase.testclassification.MediumTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.junit.AfterClass;
039import org.junit.Before;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044
045import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
046import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
047import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
048
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
050
051/**
052 * These tests verify that the RPC timeouts ('hbase.client.operation.timeout' and
053 * 'hbase.client.scanner.timeout.period') work correctly using a modified Region Server which
054 * injects delays to get, scan and mutate operations.
055 * <p/>
056 * When 'hbase.client.operation.timeout' is set and client operation is not completed in time the
057 * client will retry the operation 'hbase.client.retries.number' times. After that
058 * {@link SocketTimeoutException} will be thrown.
059 * <p/>
060 * Using 'hbase.client.scanner.timeout.period' configuration property similar behavior can be
061 * specified for scan related operations such as openScanner(), next(). If that times out
062 * {@link RetriesExhaustedException} will be thrown.
063 */
064@Category({ ClientTests.class, MediumTests.class })
065public class TestClientOperationTimeout {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestClientOperationTimeout.class);
070
071  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
072
073  // Activate the delays after table creation to test get/scan/put
074  private static int DELAY_GET;
075  private static int DELAY_SCAN;
076  private static int DELAY_MUTATE;
077
078  private static final TableName TABLE_NAME = TableName.valueOf("Timeout");
079  private static final byte[] FAMILY = Bytes.toBytes("family");
080  private static final byte[] ROW = Bytes.toBytes("row");
081  private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
082  private static final byte[] VALUE = Bytes.toBytes("value");
083
084  private static Connection CONN;
085  private static Table TABLE;
086
087  @BeforeClass
088  public static void setUpClass() throws Exception {
089    UTIL.startMiniCluster(1, 1, null, null, DelayedRegionServer.class);
090    UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
091      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
092
093    Configuration conf = new Configuration(UTIL.getConfiguration());
094    conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
095    conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 500);
096    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500);
097    conf.setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
098    CONN = ConnectionFactory.createConnection(conf);
099    TABLE = CONN.getTable(TABLE_NAME);
100  }
101
102  @Before
103  public void setUp() throws Exception {
104    DELAY_GET = 0;
105    DELAY_SCAN = 0;
106    DELAY_MUTATE = 0;
107  }
108
109  @AfterClass
110  public static void tearDown() throws Exception {
111    Closeables.close(TABLE, true);
112    Closeables.close(CONN, true);
113    UTIL.shutdownMiniCluster();
114  }
115
116  /**
117   * Tests that a get on a table throws {@link SocketTimeoutException} when the operation takes
118   * longer than 'hbase.client.operation.timeout'.
119   */
120  @Test(expected = SocketTimeoutException.class)
121  public void testGetTimeout() throws Exception {
122    DELAY_GET = 600;
123    TABLE.get(new Get(ROW));
124  }
125
126  /**
127   * Tests that a put on a table throws {@link SocketTimeoutException} when the operation takes
128   * longer than 'hbase.client.operation.timeout'.
129   */
130  @Test(expected = SocketTimeoutException.class)
131  public void testPutTimeout() throws Exception {
132    DELAY_MUTATE = 600;
133
134    Put put = new Put(ROW);
135    put.addColumn(FAMILY, QUALIFIER, VALUE);
136    TABLE.put(put);
137  }
138
139  /**
140   * Tests that scan on a table throws {@link RetriesExhaustedException} when the operation takes
141   * longer than 'hbase.client.scanner.timeout.period'.
142   */
143  @Test(expected = RetriesExhaustedException.class)
144  public void testScanTimeout() throws Exception {
145    DELAY_SCAN = 600;
146    ResultScanner scanner = TABLE.getScanner(new Scan());
147    scanner.next();
148  }
149
150  private static class DelayedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
151    public DelayedRegionServer(Configuration conf) throws IOException, InterruptedException {
152      super(conf);
153    }
154
155    @Override
156    protected RSRpcServices createRpcServices() throws IOException {
157      return new DelayedRSRpcServices(this);
158    }
159  }
160
161  /**
162   * This {@link RSRpcServices} class injects delay for Rpc calls and after executes super methods.
163   */
164  public static class DelayedRSRpcServices extends RSRpcServices {
165    DelayedRSRpcServices(HRegionServer rs) throws IOException {
166      super(rs);
167    }
168
169    @Override
170    public ClientProtos.GetResponse get(RpcController controller, ClientProtos.GetRequest request)
171        throws ServiceException {
172      try {
173        Thread.sleep(DELAY_GET);
174      } catch (InterruptedException e) {
175        LOG.error("Sleep interrupted during get operation", e);
176      }
177      return super.get(controller, request);
178    }
179
180    @Override
181    public ClientProtos.MutateResponse mutate(RpcController rpcc,
182        ClientProtos.MutateRequest request) throws ServiceException {
183      try {
184        Thread.sleep(DELAY_MUTATE);
185      } catch (InterruptedException e) {
186        LOG.error("Sleep interrupted during mutate operation", e);
187      }
188      return super.mutate(rpcc, request);
189    }
190
191    @Override
192    public ClientProtos.ScanResponse scan(RpcController controller,
193        ClientProtos.ScanRequest request) throws ServiceException {
194      try {
195        Thread.sleep(DELAY_SCAN);
196      } catch (InterruptedException e) {
197        LOG.error("Sleep interrupted during scan operation", e);
198      }
199      return super.scan(controller, request);
200    }
201  }
202}