001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
021import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.function.Supplier;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.NamespaceDescriptor;
032import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
035import org.apache.hadoop.hbase.ipc.CallTimeoutException;
036import org.apache.hadoop.hbase.regionserver.HRegionServer;
037import org.apache.hadoop.hbase.regionserver.RSRpcServices;
038import org.apache.hadoop.hbase.testclassification.ClientTests;
039import org.apache.hadoop.hbase.testclassification.LargeTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.jupiter.api.AfterAll;
042import org.junit.jupiter.api.BeforeAll;
043import org.junit.jupiter.api.Tag;
044import org.junit.jupiter.api.Test;
045import org.junit.jupiter.api.TestInfo;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
050import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
051
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
054
055@Tag(ClientTests.TAG)
056@Tag(LargeTests.TAG)
057public class TestClientScannerTimeouts {
058
059  private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class);
060  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
061
062  private static AsyncConnection ASYNC_CONN;
063  private static Connection CONN;
064  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
065  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
066  private static final byte[] VALUE = Bytes.toBytes("testValue");
067
068  private static final byte[] ROW0 = Bytes.toBytes("row-0");
069  private static final byte[] ROW1 = Bytes.toBytes("row-1");
070  private static final byte[] ROW2 = Bytes.toBytes("row-2");
071  private static final byte[] ROW3 = Bytes.toBytes("row-3");
072  private static final int rpcTimeout = 1000;
073  private static final int scanTimeout = 3 * rpcTimeout;
074  private static final int metaScanTimeout = 6 * rpcTimeout;
075  private static final int CLIENT_RETRIES_NUMBER = 3;
076
077  private static Table table;
078  private static AsyncTable<AdvancedScanResultConsumer> asyncTable;
079
080  @BeforeAll
081  public static void setUpBeforeClass() throws Exception {
082    Configuration conf = TEST_UTIL.getConfiguration();
083    // Don't report so often so easier to see other rpcs
084    conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
085    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
086    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
087    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
088    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
089    TEST_UTIL.startMiniCluster(1);
090
091    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout);
092    conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout);
093    conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout);
094    ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get();
095    CONN = ASYNC_CONN.toConnection();
096  }
097
098  @AfterAll
099  public static void tearDownAfterClass() throws Exception {
100    CONN.close();
101    ASYNC_CONN.close();
102    TEST_UTIL.shutdownMiniCluster();
103  }
104
105  public void setup(boolean isSystemTable, TestInfo testInfo) throws IOException {
106    RSRpcServicesWithScanTimeout.reset();
107
108    String nameAsString = testInfo.getTestMethod().get().getName();
109    if (isSystemTable) {
110      nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + nameAsString;
111    }
112    final TableName tableName = TableName.valueOf(nameAsString);
113
114    TEST_UTIL.createTable(tableName, FAMILY);
115    table = CONN.getTable(tableName);
116    asyncTable = ASYNC_CONN.getTable(tableName);
117    putToTable(table, ROW0);
118    putToTable(table, ROW1);
119    putToTable(table, ROW2);
120    putToTable(table, ROW3);
121    LOG.info("Wrote our four values");
122
123    table.getRegionLocator().getAllRegionLocations();
124
125    // reset again incase the creation/population caused anything to trigger
126    RSRpcServicesWithScanTimeout.reset();
127  }
128
129  private void expectRow(byte[] expected, Result result) {
130    assertTrue(Bytes.equals(expected, result.getRow()),
131      "Expected row: " + Bytes.toString(expected));
132  }
133
134  private void expectNumTries(int expected) {
135    assertEquals(expected, RSRpcServicesWithScanTimeout.tryNumber,
136      "Expected tryNumber=" + expected + ", actual=" + RSRpcServicesWithScanTimeout.tryNumber);
137    // reset for next
138    RSRpcServicesWithScanTimeout.tryNumber = 0;
139  }
140
141  /**
142   * verify that we don't miss any data when encountering an OutOfOrderScannerNextException.
143   * Typically, the only way to naturally trigger this is if a client-side timeout causes an
144   * erroneous next() call. This is relatively hard to do these days because the server attempts to
145   * always return before the timeout. In this test we force the server to throw this exception, so
146   * that we can test the retry logic appropriately.
147   */
148  @Test
149  public void testRetryOutOfOrderScannerNextException(TestInfo testInfo) throws IOException {
150    expectRetryOutOfOrderScannerNext(this::getScanner, testInfo);
151  }
152
153  /**
154   * AsyncTable version of above
155   */
156  @Test
157  public void testRetryOutOfOrderScannerNextExceptionAsync(TestInfo testInfo) throws IOException {
158    expectRetryOutOfOrderScannerNext(this::getAsyncScanner, testInfo);
159  }
160
161  /**
162   * verify that we honor the {@link HConstants#HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD} for normal
163   * scans.
164   */
165  @Test
166  public void testNormalScanTimeoutOnNext(TestInfo testInfo) throws IOException {
167    setup(false, testInfo);
168    expectTimeoutOnNext(scanTimeout, this::getScanner);
169  }
170
171  /**
172   * AsyncTable version of above
173   */
174  @Test
175  public void testNormalScanTimeoutOnNextAsync(TestInfo testInfo) throws IOException {
176    setup(false, testInfo);
177    expectTimeoutOnNext(scanTimeout, this::getAsyncScanner);
178  }
179
180  /**
181   * verify that we honor {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for openScanner() calls for
182   * meta scans
183   */
184  @Test
185  public void testNormalScanTimeoutOnOpenScanner(TestInfo testInfo) throws IOException {
186    setup(false, testInfo);
187    expectTimeoutOnOpenScanner(rpcTimeout, this::getScanner);
188  }
189
190  /**
191   * AsyncTable version of above
192   */
193  @Test
194  public void testNormalScanTimeoutOnOpenScannerAsync(TestInfo testInfo) throws IOException {
195    setup(false, testInfo);
196    expectTimeoutOnOpenScanner(rpcTimeout, this::getAsyncScanner);
197  }
198
199  /**
200   * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_SCANNER_TIMEOUT} for
201   * next() calls in meta scans
202   */
203  @Test
204  public void testMetaScanTimeoutOnNext(TestInfo testInfo) throws IOException {
205    setup(true, testInfo);
206    expectTimeoutOnNext(metaScanTimeout, this::getScanner);
207  }
208
209  /**
210   * AsyncTable version of above
211   */
212  @Test
213  public void testMetaScanTimeoutOnNextAsync(TestInfo testInfo) throws IOException {
214    setup(true, testInfo);
215    expectTimeoutOnNext(metaScanTimeout, this::getAsyncScanner);
216  }
217
218  /**
219   * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY} for
220   * openScanner() calls for meta scans
221   */
222  @Test
223  public void testMetaScanTimeoutOnOpenScanner(TestInfo testInfo) throws IOException {
224    setup(true, testInfo);
225    expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner);
226  }
227
228  /**
229   * AsyncTable version of above
230   */
231  @Test
232  public void testMetaScanTimeoutOnOpenScannerAsync(TestInfo testInfo) throws IOException {
233    setup(true, testInfo);
234    expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner);
235  }
236
237  private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> scannerSupplier,
238    TestInfo testInfo) throws IOException {
239    setup(false, testInfo);
240    RSRpcServicesWithScanTimeout.seqNoToThrowOn = 1;
241
242    LOG.info(
243      "Opening scanner, expecting no errors from first next() call from openScanner response");
244    ResultScanner scanner = scannerSupplier.get();
245    Result result = scanner.next();
246    expectRow(ROW0, result);
247    expectNumTries(0);
248
249    LOG.info("Making first next() RPC, expecting no errors for seqNo 0");
250    result = scanner.next();
251    expectRow(ROW1, result);
252    expectNumTries(0);
253
254    LOG.info(
255      "Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry");
256    result = scanner.next();
257    expectRow(ROW2, result);
258    expectNumTries(1);
259
260    // reset so no errors. since last call restarted the scan and following
261    // call would otherwise fail
262    RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1;
263
264    LOG.info("Finishing scan, expecting no errors");
265    result = scanner.next();
266    expectRow(ROW3, result);
267    scanner.close();
268
269    LOG.info("Testing always throw exception");
270    byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 };
271    int i = 0;
272
273    // test the case that RPC always throws
274    scanner = scannerSupplier.get();
275    RSRpcServicesWithScanTimeout.throwAlways = true;
276
277    while (true) {
278      LOG.info("Calling scanner.next()");
279      result = scanner.next();
280      if (result == null) {
281        break;
282      } else {
283        byte[] expectedResult = expectedResults[i++];
284        expectRow(expectedResult, result);
285      }
286    }
287
288    // ensure we verified all rows. this along with the expectRow check above
289    // proves that we didn't miss any rows.
290    assertEquals(expectedResults.length, i, "Expected to exhaust expectedResults array length="
291      + expectedResults.length + ", actual index=" + i);
292
293    // expect all but the first row (which came from initial openScanner) to have thrown an error
294    expectNumTries(expectedResults.length - 1);
295
296  }
297
298  private void expectTimeoutOnNext(int timeout, Supplier<ResultScanner> scannerSupplier)
299    throws IOException {
300    RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1;
301    RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
302
303    LOG.info(
304      "Opening scanner, expecting no timeouts from first next() call from openScanner response");
305    ResultScanner scanner = scannerSupplier.get();
306    Result result = scanner.next();
307    expectRow(ROW0, result);
308
309    LOG.info("Making first next() RPC, expecting no timeout for seqNo 0");
310    result = scanner.next();
311    expectRow(ROW1, result);
312
313    LOG.info("Making second next() RPC, expecting timeout");
314    long start = System.nanoTime();
315    try {
316      scanner.next();
317      fail("Expected CallTimeoutException");
318    } catch (RetriesExhaustedException e) {
319      assertTrue(e.getCause() instanceof CallTimeoutException, "Expected CallTimeoutException");
320    }
321    expectTimeout(start, timeout);
322  }
323
324  private void expectTimeoutOnOpenScanner(int timeout, Supplier<ResultScanner> scannerSupplier)
325    throws IOException {
326    RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
327    RSRpcServicesWithScanTimeout.sleepOnOpen = true;
328    LOG.info("Opening scanner, expecting timeout from first next() call from openScanner response");
329    long start = System.nanoTime();
330    try {
331      scannerSupplier.get().next();
332      fail("Expected CallTimeoutException");
333    } catch (RetriesExhaustedException e) {
334      assertTrue(e.getCause() instanceof CallTimeoutException,
335        "Expected CallTimeoutException, but was " + e.getCause());
336    }
337    expectTimeout(start, timeout);
338  }
339
340  private void expectTimeout(long start, int timeout) {
341    long duration = System.nanoTime() - start;
342    assertTrue(duration >= timeout, "Expected duration >= " + timeout + ", but was " + duration);
343  }
344
345  private ResultScanner getScanner() {
346    Scan scan = new Scan();
347    scan.setCaching(1);
348    try {
349      return table.getScanner(scan);
350    } catch (IOException e) {
351      throw new RuntimeException(e);
352    }
353  }
354
355  private ResultScanner getAsyncScanner() {
356    Scan scan = new Scan();
357    scan.setCaching(1);
358    return asyncTable.getScanner(scan);
359  }
360
361  private void putToTable(Table ht, byte[] rowkey) throws IOException {
362    Put put = new Put(rowkey);
363    put.addColumn(FAMILY, QUALIFIER, VALUE);
364    ht.put(put);
365  }
366
367  private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer {
368    public RegionServerWithScanTimeout(Configuration conf)
369      throws IOException, InterruptedException {
370      super(conf);
371    }
372
373    @Override
374    protected RSRpcServices createRpcServices() throws IOException {
375      return new RSRpcServicesWithScanTimeout(this);
376    }
377  }
378
379  private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
380    private long tableScannerId;
381
382    private static long seqNoToThrowOn = -1;
383    private static boolean throwAlways = false;
384    private static boolean threw;
385
386    private static long seqNoToSleepOn = -1;
387    private static boolean sleepOnOpen = false;
388    private static volatile boolean slept;
389    private static int tryNumber = 0;
390
391    private static int sleepTime = rpcTimeout + 500;
392
393    public static void setSleepForTimeout(int timeout) {
394      sleepTime = timeout + 500;
395    }
396
397    public static void reset() {
398      setSleepForTimeout(scanTimeout);
399
400      seqNoToSleepOn = -1;
401      seqNoToThrowOn = -1;
402      throwAlways = false;
403      threw = false;
404      sleepOnOpen = false;
405      slept = false;
406      tryNumber = 0;
407    }
408
409    public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException {
410      super(rs);
411    }
412
413    @Override
414    public ScanResponse scan(final RpcController controller, final ScanRequest request)
415      throws ServiceException {
416      if (request.hasScannerId()) {
417        ScanResponse scanResponse = super.scan(controller, request);
418        if (tableScannerId != request.getScannerId() || request.getCloseScanner()) {
419          return scanResponse;
420        }
421
422        if (
423          throwAlways
424            || (!threw && request.hasNextCallSeq() && seqNoToThrowOn == request.getNextCallSeq())
425        ) {
426          threw = true;
427          tryNumber++;
428          LOG.info("THROWING exception, tryNumber={}, tableScannerId={}", tryNumber,
429            tableScannerId);
430          throw new ServiceException(new OutOfOrderScannerNextException());
431        }
432
433        if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq()) {
434          try {
435            LOG.info("SLEEPING " + sleepTime);
436            Thread.sleep(sleepTime);
437          } catch (InterruptedException e) {
438          }
439          slept = true;
440          tryNumber++;
441        }
442        return scanResponse;
443      } else {
444        ScanResponse scanRes = super.scan(controller, request);
445        String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
446        if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) {
447          tableScannerId = scanRes.getScannerId();
448          if (sleepOnOpen) {
449            try {
450              LOG.info("openScanner SLEEPING " + sleepTime);
451              Thread.sleep(sleepTime);
452            } catch (InterruptedException e) {
453            }
454          }
455        }
456        return scanRes;
457      }
458    }
459  }
460}