001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HBaseClassTestRule;
025import org.apache.hadoop.hbase.HBaseTestingUtility;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.regionserver.HRegionServer;
030import org.apache.hadoop.hbase.regionserver.RSRpcServices;
031import org.apache.hadoop.hbase.testclassification.ClientTests;
032import org.apache.hadoop.hbase.testclassification.MediumTests;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.junit.AfterClass;
035import org.junit.BeforeClass;
036import org.junit.ClassRule;
037import org.junit.Rule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.junit.rules.TestName;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
045import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
046
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
049
050/**
051 * Test the scenario where a HRegionServer#scan() call, while scanning, timeout at client side and
052 * getting retried. This scenario should not result in some data being skipped at RS side.
053 */
054@Category({MediumTests.class, ClientTests.class})
055public class TestClientScannerRPCTimeout {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059      HBaseClassTestRule.forClass(TestClientScannerRPCTimeout.class);
060
061  private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerRPCTimeout.class);
062  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
063  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
064  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
065  private static final byte[] VALUE = Bytes.toBytes("testValue");
066  private static final int rpcTimeout = 2 * 1000;
067  private static final int CLIENT_RETRIES_NUMBER = 3;
068
069  @Rule
070  public TestName name = new TestName();
071
072  @BeforeClass
073  public static void setUpBeforeClass() throws Exception {
074    Configuration conf = TEST_UTIL.getConfiguration();
075    // Don't report so often so easier to see other rpcs
076    conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
077    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
078    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
079    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
080    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
081    TEST_UTIL.startMiniCluster(1);
082  }
083
084  @AfterClass
085  public static void tearDownAfterClass() throws Exception {
086    TEST_UTIL.shutdownMiniCluster();
087  }
088
089  @Test
090  public void testScannerNextRPCTimesout() throws Exception {
091    final TableName tableName = TableName.valueOf(name.getMethodName());
092    Table ht = TEST_UTIL.createTable(tableName, FAMILY);
093    byte[] r0 = Bytes.toBytes("row-0");
094    byte[] r1 = Bytes.toBytes("row-1");
095    byte[] r2 = Bytes.toBytes("row-2");
096    byte[] r3 = Bytes.toBytes("row-3");
097    putToTable(ht, r0);
098    putToTable(ht, r1);
099    putToTable(ht, r2);
100    putToTable(ht, r3);
101    LOG.info("Wrote our three values");
102    RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1;
103    Scan scan = new Scan();
104    scan.setCaching(1);
105    ResultScanner scanner = ht.getScanner(scan);
106    Result result = scanner.next();
107    // fetched when openScanner
108    assertTrue("Expected row: row-0", Bytes.equals(r0, result.getRow()));
109    result = scanner.next();
110    assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow()));
111    LOG.info("Got expected first row");
112    long t1 = System.currentTimeMillis();
113    result = scanner.next();
114    assertTrue((System.currentTimeMillis() - t1) > rpcTimeout);
115    assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow()));
116    RSRpcServicesWithScanTimeout.seqNoToSleepOn = -1;// No need of sleep
117    result = scanner.next();
118    assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow()));
119    scanner.close();
120
121    // test the case that RPC is always timesout
122    scanner = ht.getScanner(scan);
123    RSRpcServicesWithScanTimeout.sleepAlways = true;
124    RSRpcServicesWithScanTimeout.tryNumber = 0;
125    try {
126      result = scanner.next();
127    } catch (IOException ioe) {
128      // catch the exception after max retry number
129      LOG.info("Failed after maximal attempts=" + CLIENT_RETRIES_NUMBER, ioe);
130    }
131    assertTrue("Expected maximal try number=" + CLIENT_RETRIES_NUMBER
132        + ", actual =" + RSRpcServicesWithScanTimeout.tryNumber,
133        RSRpcServicesWithScanTimeout.tryNumber <= CLIENT_RETRIES_NUMBER);
134  }
135
136  private void putToTable(Table ht, byte[] rowkey) throws IOException {
137    Put put = new Put(rowkey);
138    put.addColumn(FAMILY, QUALIFIER, VALUE);
139    ht.put(put);
140  }
141
142  private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer {
143    public RegionServerWithScanTimeout(Configuration conf)
144        throws IOException, InterruptedException {
145      super(conf);
146    }
147
148    @Override
149    protected RSRpcServices createRpcServices() throws IOException {
150      return new RSRpcServicesWithScanTimeout(this);
151    }
152  }
153
154  private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
155    private long tableScannerId;
156    private boolean slept;
157    private static long seqNoToSleepOn = -1;
158    private static boolean sleepAlways = false;
159    private static int tryNumber = 0;
160
161    public RSRpcServicesWithScanTimeout(HRegionServer rs)
162        throws IOException {
163      super(rs);
164    }
165
166    @Override
167    public ScanResponse scan(final RpcController controller, final ScanRequest request)
168        throws ServiceException {
169      if (request.hasScannerId()) {
170        ScanResponse scanResponse = super.scan(controller, request);
171        if (this.tableScannerId == request.getScannerId() &&
172            (sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq()))) {
173          try {
174            LOG.info("SLEEPING " + (rpcTimeout + 500));
175            Thread.sleep(rpcTimeout + 500);
176          } catch (InterruptedException e) {
177          }
178          slept = true;
179          tryNumber++;
180          if (tryNumber > 2 * CLIENT_RETRIES_NUMBER) {
181            sleepAlways = false;
182          }
183        }
184        return scanResponse;
185      } else {
186        ScanResponse scanRes = super.scan(controller, request);
187        String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
188        if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) {
189          tableScannerId = scanRes.getScannerId();
190        }
191        return scanRes;
192      }
193    }
194  }
195}