001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
021import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.function.Supplier;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.NamespaceDescriptor;
033import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
036import org.apache.hadoop.hbase.ipc.CallTimeoutException;
037import org.apache.hadoop.hbase.regionserver.HRegionServer;
038import org.apache.hadoop.hbase.regionserver.RSRpcServices;
039import org.apache.hadoop.hbase.testclassification.ClientTests;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.junit.AfterClass;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Rule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048import org.junit.rules.TestName;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
053import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
054
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
057
058@Category({ MediumTests.class, ClientTests.class })
059public class TestClientScannerTimeouts {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestClientScannerTimeouts.class);
064
065  private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class);
066  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
067
068  private static AsyncConnection ASYNC_CONN;
069  private static Connection CONN;
070  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
071  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
072  private static final byte[] VALUE = Bytes.toBytes("testValue");
073
074  private static final byte[] ROW0 = Bytes.toBytes("row-0");
075  private static final byte[] ROW1 = Bytes.toBytes("row-1");
076  private static final byte[] ROW2 = Bytes.toBytes("row-2");
077  private static final byte[] ROW3 = Bytes.toBytes("row-3");
078  private static final int rpcTimeout = 1000;
079  private static final int scanTimeout = 3 * rpcTimeout;
080  private static final int metaScanTimeout = 6 * rpcTimeout;
081  private static final int CLIENT_RETRIES_NUMBER = 3;
082
083  private static Table table;
084  private static AsyncTable<AdvancedScanResultConsumer> asyncTable;
085
086  @Rule
087  public TestName name = new TestName();
088
089  @BeforeClass
090  public static void setUpBeforeClass() throws Exception {
091    Configuration conf = TEST_UTIL.getConfiguration();
092    // Don't report so often so easier to see other rpcs
093    conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
094    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
095    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
096    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
097    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
098    TEST_UTIL.startMiniCluster(1);
099
100    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout);
101    conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout);
102    conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout);
103    ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get();
104    CONN = ASYNC_CONN.toConnection();
105  }
106
107  @AfterClass
108  public static void tearDownAfterClass() throws Exception {
109    CONN.close();
110    ASYNC_CONN.close();
111    TEST_UTIL.shutdownMiniCluster();
112  }
113
114  public void setup(boolean isSystemTable) throws IOException {
115    RSRpcServicesWithScanTimeout.reset();
116
117    String nameAsString = name.getMethodName();
118    if (isSystemTable) {
119      nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + nameAsString;
120    }
121    final TableName tableName = TableName.valueOf(nameAsString);
122
123    TEST_UTIL.createTable(tableName, FAMILY);
124    table = CONN.getTable(tableName);
125    asyncTable = ASYNC_CONN.getTable(tableName);
126    putToTable(table, ROW0);
127    putToTable(table, ROW1);
128    putToTable(table, ROW2);
129    putToTable(table, ROW3);
130    LOG.info("Wrote our four values");
131
132    table.getRegionLocator().getAllRegionLocations();
133
134    // reset again incase the creation/population caused anything to trigger
135    RSRpcServicesWithScanTimeout.reset();
136  }
137
138  private void expectRow(byte[] expected, Result result) {
139    assertTrue("Expected row: " + Bytes.toString(expected),
140      Bytes.equals(expected, result.getRow()));
141  }
142
143  private void expectNumTries(int expected) {
144    assertEquals(
145      "Expected tryNumber=" + expected + ", actual=" + RSRpcServicesWithScanTimeout.tryNumber,
146      expected, RSRpcServicesWithScanTimeout.tryNumber);
147    // reset for next
148    RSRpcServicesWithScanTimeout.tryNumber = 0;
149  }
150
151  /**
152   * verify that we don't miss any data when encountering an OutOfOrderScannerNextException.
153   * Typically, the only way to naturally trigger this is if a client-side timeout causes an
154   * erroneous next() call. This is relatively hard to do these days because the server attempts to
155   * always return before the timeout. In this test we force the server to throw this exception, so
156   * that we can test the retry logic appropriately.
157   */
158  @Test
159  public void testRetryOutOfOrderScannerNextException() throws IOException {
160    expectRetryOutOfOrderScannerNext(this::getScanner);
161  }
162
163  /**
164   * AsyncTable version of above
165   */
166  @Test
167  public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException {
168    expectRetryOutOfOrderScannerNext(this::getAsyncScanner);
169  }
170
171  /**
172   * verify that we honor the {@link HConstants#HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD} for normal
173   * scans.
174   */
175  @Test
176  public void testNormalScanTimeoutOnNext() throws IOException {
177    setup(false);
178    expectTimeoutOnNext(scanTimeout, this::getScanner);
179  }
180
181  /**
182   * AsyncTable version of above
183   */
184  @Test
185  public void testNormalScanTimeoutOnNextAsync() throws IOException {
186    setup(false);
187    expectTimeoutOnNext(scanTimeout, this::getAsyncScanner);
188  }
189
190  /**
191   * verify that we honor {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for openScanner() calls for
192   * meta scans
193   */
194  @Test
195  public void testNormalScanTimeoutOnOpenScanner() throws IOException {
196    setup(false);
197    expectTimeoutOnOpenScanner(rpcTimeout, this::getScanner);
198  }
199
200  /**
201   * AsyncTable version of above
202   */
203  @Test
204  public void testNormalScanTimeoutOnOpenScannerAsync() throws IOException {
205    setup(false);
206    expectTimeoutOnOpenScanner(rpcTimeout, this::getAsyncScanner);
207  }
208
209  /**
210   * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_SCANNER_TIMEOUT} for
211   * next() calls in meta scans
212   */
213  @Test
214  public void testMetaScanTimeoutOnNext() throws IOException {
215    setup(true);
216    expectTimeoutOnNext(metaScanTimeout, this::getScanner);
217  }
218
219  /**
220   * AsyncTable version of above
221   */
222  @Test
223  public void testMetaScanTimeoutOnNextAsync() throws IOException {
224    setup(true);
225    expectTimeoutOnNext(metaScanTimeout, this::getAsyncScanner);
226  }
227
228  /**
229   * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY} for
230   * openScanner() calls for meta scans
231   */
232  @Test
233  public void testMetaScanTimeoutOnOpenScanner() throws IOException {
234    setup(true);
235    expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner);
236  }
237
238  /**
239   * AsyncTable version of above
240   */
241  @Test
242  public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException {
243    setup(true);
244    expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner);
245  }
246
247  private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> scannerSupplier)
248    throws IOException {
249    setup(false);
250    RSRpcServicesWithScanTimeout.seqNoToThrowOn = 1;
251
252    LOG.info(
253      "Opening scanner, expecting no errors from first next() call from openScanner response");
254    ResultScanner scanner = scannerSupplier.get();
255    Result result = scanner.next();
256    expectRow(ROW0, result);
257    expectNumTries(0);
258
259    LOG.info("Making first next() RPC, expecting no errors for seqNo 0");
260    result = scanner.next();
261    expectRow(ROW1, result);
262    expectNumTries(0);
263
264    LOG.info(
265      "Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry");
266    result = scanner.next();
267    expectRow(ROW2, result);
268    expectNumTries(1);
269
270    // reset so no errors. since last call restarted the scan and following
271    // call would otherwise fail
272    RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1;
273
274    LOG.info("Finishing scan, expecting no errors");
275    result = scanner.next();
276    expectRow(ROW3, result);
277    scanner.close();
278
279    LOG.info("Testing always throw exception");
280    byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 };
281    int i = 0;
282
283    // test the case that RPC always throws
284    scanner = scannerSupplier.get();
285    RSRpcServicesWithScanTimeout.throwAlways = true;
286
287    while (true) {
288      LOG.info("Calling scanner.next()");
289      result = scanner.next();
290      if (result == null) {
291        break;
292      } else {
293        byte[] expectedResult = expectedResults[i++];
294        expectRow(expectedResult, result);
295      }
296    }
297
298    // ensure we verified all rows. this along with the expectRow check above
299    // proves that we didn't miss any rows.
300    assertEquals("Expected to exhaust expectedResults array length=" + expectedResults.length
301      + ", actual index=" + i, expectedResults.length, i);
302
303    // expect all but the first row (which came from initial openScanner) to have thrown an error
304    expectNumTries(expectedResults.length - 1);
305
306  }
307
308  private void expectTimeoutOnNext(int timeout, Supplier<ResultScanner> scannerSupplier)
309    throws IOException {
310    RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1;
311    RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
312
313    LOG.info(
314      "Opening scanner, expecting no timeouts from first next() call from openScanner response");
315    ResultScanner scanner = scannerSupplier.get();
316    Result result = scanner.next();
317    expectRow(ROW0, result);
318
319    LOG.info("Making first next() RPC, expecting no timeout for seqNo 0");
320    result = scanner.next();
321    expectRow(ROW1, result);
322
323    LOG.info("Making second next() RPC, expecting timeout");
324    long start = System.nanoTime();
325    try {
326      scanner.next();
327      fail("Expected CallTimeoutException");
328    } catch (RetriesExhaustedException e) {
329      assertTrue("Expected CallTimeoutException", e.getCause() instanceof CallTimeoutException);
330    }
331    expectTimeout(start, timeout);
332  }
333
334  private void expectTimeoutOnOpenScanner(int timeout, Supplier<ResultScanner> scannerSupplier)
335    throws IOException {
336    RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
337    RSRpcServicesWithScanTimeout.sleepOnOpen = true;
338    LOG.info("Opening scanner, expecting timeout from first next() call from openScanner response");
339    long start = System.nanoTime();
340    try {
341      scannerSupplier.get().next();
342      fail("Expected CallTimeoutException");
343    } catch (RetriesExhaustedException e) {
344      assertTrue("Expected CallTimeoutException, but was " + e.getCause(),
345        e.getCause() instanceof CallTimeoutException);
346    }
347    expectTimeout(start, timeout);
348  }
349
350  private void expectTimeout(long start, int timeout) {
351    long duration = System.nanoTime() - start;
352    assertTrue("Expected duration >= " + timeout + ", but was " + duration, duration >= timeout);
353  }
354
355  private ResultScanner getScanner() {
356    Scan scan = new Scan();
357    scan.setCaching(1);
358    try {
359      return table.getScanner(scan);
360    } catch (IOException e) {
361      throw new RuntimeException(e);
362    }
363  }
364
365  private ResultScanner getAsyncScanner() {
366    Scan scan = new Scan();
367    scan.setCaching(1);
368    return asyncTable.getScanner(scan);
369  }
370
371  private void putToTable(Table ht, byte[] rowkey) throws IOException {
372    Put put = new Put(rowkey);
373    put.addColumn(FAMILY, QUALIFIER, VALUE);
374    ht.put(put);
375  }
376
377  private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer {
378    public RegionServerWithScanTimeout(Configuration conf)
379      throws IOException, InterruptedException {
380      super(conf);
381    }
382
383    @Override
384    protected RSRpcServices createRpcServices() throws IOException {
385      return new RSRpcServicesWithScanTimeout(this);
386    }
387  }
388
389  private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
390    private long tableScannerId;
391
392    private static long seqNoToThrowOn = -1;
393    private static boolean throwAlways = false;
394    private static boolean threw;
395
396    private static long seqNoToSleepOn = -1;
397    private static boolean sleepOnOpen = false;
398    private static volatile boolean slept;
399    private static int tryNumber = 0;
400
401    private static int sleepTime = rpcTimeout + 500;
402
403    public static void setSleepForTimeout(int timeout) {
404      sleepTime = timeout + 500;
405    }
406
407    public static void reset() {
408      setSleepForTimeout(scanTimeout);
409
410      seqNoToSleepOn = -1;
411      seqNoToThrowOn = -1;
412      throwAlways = false;
413      threw = false;
414      sleepOnOpen = false;
415      slept = false;
416      tryNumber = 0;
417    }
418
419    public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException {
420      super(rs);
421    }
422
423    @Override
424    public ScanResponse scan(final RpcController controller, final ScanRequest request)
425      throws ServiceException {
426      if (request.hasScannerId()) {
427        ScanResponse scanResponse = super.scan(controller, request);
428        if (tableScannerId != request.getScannerId() || request.getCloseScanner()) {
429          return scanResponse;
430        }
431
432        if (
433          throwAlways
434            || (!threw && request.hasNextCallSeq() && seqNoToThrowOn == request.getNextCallSeq())
435        ) {
436          threw = true;
437          tryNumber++;
438          LOG.info("THROWING exception, tryNumber={}, tableScannerId={}", tryNumber,
439            tableScannerId);
440          throw new ServiceException(new OutOfOrderScannerNextException());
441        }
442
443        if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq()) {
444          try {
445            LOG.info("SLEEPING " + sleepTime);
446            Thread.sleep(sleepTime);
447          } catch (InterruptedException e) {
448          }
449          slept = true;
450          tryNumber++;
451        }
452        return scanResponse;
453      } else {
454        ScanResponse scanRes = super.scan(controller, request);
455        String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
456        if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) {
457          tableScannerId = scanRes.getScannerId();
458          if (sleepOnOpen) {
459            try {
460              LOG.info("openScanner SLEEPING " + sleepTime);
461              Thread.sleep(sleepTime);
462            } catch (InterruptedException e) {
463            }
464          }
465        }
466        return scanRes;
467      }
468    }
469  }
470}