001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
021import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.net.SocketTimeoutException;
028import java.util.concurrent.TimeUnit;
029import java.util.function.Supplier;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.MiniHBaseCluster;
035import org.apache.hadoop.hbase.NamespaceDescriptor;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
038import org.apache.hadoop.hbase.ipc.CallTimeoutException;
039import org.apache.hadoop.hbase.regionserver.HRegionServer;
040import org.apache.hadoop.hbase.regionserver.RSRpcServices;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.junit.AfterClass;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Rule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.rules.TestName;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
055import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
056
057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
059
060@Category({ MediumTests.class, ClientTests.class })
061public class TestClientScannerTimeouts {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestClientScannerTimeouts.class);
066
067  private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class);
068  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
069
070  private static AsyncConnection ASYNC_CONN;
071  private static Connection CONN;
072  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
073  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
074  private static final byte[] VALUE = Bytes.toBytes("testValue");
075
076  private static final byte[] ROW0 = Bytes.toBytes("row-0");
077  private static final byte[] ROW1 = Bytes.toBytes("row-1");
078  private static final byte[] ROW2 = Bytes.toBytes("row-2");
079  private static final byte[] ROW3 = Bytes.toBytes("row-3");
080  private static final int rpcTimeout = 1000;
081  private static final int scanTimeout = 3 * rpcTimeout;
082  private static final int metaScanTimeout = 6 * rpcTimeout;
083  private static final int CLIENT_RETRIES_NUMBER = 3;
084
085  private static TableName tableName;
086
087  @Rule
088  public TestName name = new TestName();
089
090  @BeforeClass
091  public static void setUpBeforeClass() throws Exception {
092    Configuration conf = TEST_UTIL.getConfiguration();
093    // Don't report so often so easier to see other rpcs
094    conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
095    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
096    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
097    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
098    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
099    TEST_UTIL.startMiniCluster(1);
100
101    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout);
102    conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout);
103    conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout);
104    ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get();
105    CONN = ConnectionFactory.createConnection(conf);
106  }
107
108  @AfterClass
109  public static void tearDownAfterClass() throws Exception {
110    CONN.close();
111    ASYNC_CONN.close();
112    TEST_UTIL.shutdownMiniCluster();
113  }
114
115  public void setup(boolean isSystemTable) throws IOException {
116    RSRpcServicesWithScanTimeout.reset();
117
118    String nameAsString = name.getMethodName();
119    if (isSystemTable) {
120      nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + nameAsString;
121    }
122    tableName = TableName.valueOf(nameAsString);
123    TEST_UTIL.createTable(tableName, FAMILY);
124
125    Table table = CONN.getTable(tableName);
126    putToTable(table, ROW0);
127    putToTable(table, ROW1);
128    putToTable(table, ROW2);
129    putToTable(table, ROW3);
130    LOG.info("Wrote our four values");
131
132    table.getRegionLocator().getAllRegionLocations();
133
134    // reset again incase the creation/population caused anything to trigger
135    RSRpcServicesWithScanTimeout.reset();
136  }
137
138  private void expectRow(byte[] expected, Result result) {
139    assertTrue("Expected row: " + Bytes.toString(expected),
140      Bytes.equals(expected, result.getRow()));
141  }
142
143  private void expectNumTries(int expected) {
144    assertEquals(
145      "Expected tryNumber=" + expected + ", actual=" + RSRpcServicesWithScanTimeout.tryNumber,
146      expected, RSRpcServicesWithScanTimeout.tryNumber);
147    // reset for next
148    RSRpcServicesWithScanTimeout.tryNumber = 0;
149  }
150
151  /**
152   * verify that we don't miss any data when encountering an OutOfOrderScannerNextException.
153   * Typically, the only way to naturally trigger this is if a client-side timeout causes an
154   * erroneous next() call. This is relatively hard to do these days because the server attempts to
155   * always return before the timeout. In this test we force the server to throw this exception, so
156   * that we can test the retry logic appropriately.
157   */
158  @Test
159  public void testRetryOutOfOrderScannerNextException() throws IOException {
160    expectRetryOutOfOrderScannerNext(() -> getScanner(CONN));
161  }
162
163  /**
164   * AsyncTable version of above
165   */
166  @Test
167  public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException {
168    expectRetryOutOfOrderScannerNext(this::getAsyncScanner);
169  }
170
171  /**
172   * verify that we honor the {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for normal scans. Use a
173   * special connection which has retries disabled, because otherwise the scanner will retry the
174   * timed out next() call and mess up the test.
175   */
176  @Test
177  public void testNormalScanTimeoutOnNext() throws IOException {
178    setup(false);
179    // Unlike AsyncTable, Table's ResultScanner.next() call uses rpcTimeout and
180    // will retry until scannerTimeout. This makes it more difficult to test the timeouts
181    // of normal next() calls. So we use a separate connection here which has retries disabled.
182    Configuration confNoRetries = new Configuration(CONN.getConfiguration());
183    confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
184    try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) {
185      expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn));
186    }
187  }
188
189  /**
190   * AsyncTable version of above
191   */
192  @Test
193  public void testNormalScanTimeoutOnNextAsync() throws IOException {
194    setup(false);
195    expectTimeoutOnNext(scanTimeout, this::getAsyncScanner);
196  }
197
198  /**
199   * verify that we honor {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for openScanner() calls for
200   * meta scans
201   */
202  @Test
203  public void testNormalScanTimeoutOnOpenScanner() throws IOException {
204    setup(false);
205    expectTimeoutOnOpenScanner(rpcTimeout, this::getScanner);
206  }
207
208  /**
209   * AsyncTable version of above
210   */
211  @Test
212  public void testNormalScanTimeoutOnOpenScannerAsync() throws IOException {
213    setup(false);
214    expectTimeoutOnOpenScanner(rpcTimeout, this::getAsyncScanner);
215  }
216
217  /**
218   * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_SCANNER_TIMEOUT} for
219   * next() calls in meta scans
220   */
221  @Test
222  public void testMetaScanTimeoutOnNext() throws IOException {
223    setup(true);
224    expectTimeoutOnNext(metaScanTimeout, this::getScanner);
225  }
226
227  /**
228   * AsyncTable version of above
229   */
230  @Test
231  public void testMetaScanTimeoutOnNextAsync() throws IOException {
232    setup(true);
233    expectTimeoutOnNext(metaScanTimeout, this::getAsyncScanner);
234  }
235
236  /**
237   * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY} for
238   * openScanner() calls for meta scans
239   */
240  @Test
241  public void testMetaScanTimeoutOnOpenScanner() throws IOException {
242    setup(true);
243    expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner);
244  }
245
246  /**
247   * AsyncTable version of above
248   */
249  @Test
250  public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException {
251    setup(true);
252    expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner);
253  }
254
255  private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> scannerSupplier)
256    throws IOException {
257    setup(false);
258    RSRpcServicesWithScanTimeout.seqNoToThrowOn = 1;
259
260    LOG.info(
261      "Opening scanner, expecting no errors from first next() call from openScanner response");
262    ResultScanner scanner = scannerSupplier.get();
263    Result result = scanner.next();
264    expectRow(ROW0, result);
265    expectNumTries(0);
266
267    LOG.info("Making first next() RPC, expecting no errors for seqNo 0");
268    result = scanner.next();
269    expectRow(ROW1, result);
270    expectNumTries(0);
271
272    LOG.info(
273      "Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry");
274    result = scanner.next();
275    expectRow(ROW2, result);
276    expectNumTries(1);
277
278    // reset so no errors. since last call restarted the scan and following
279    // call would otherwise fail
280    RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1;
281
282    LOG.info("Finishing scan, expecting no errors");
283    result = scanner.next();
284    expectRow(ROW3, result);
285    scanner.close();
286
287    LOG.info("Testing always throw exception");
288    byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 };
289    int i = 0;
290
291    // test the case that RPC always throws
292    scanner = scannerSupplier.get();
293    RSRpcServicesWithScanTimeout.throwAlways = true;
294
295    while (true) {
296      LOG.info("Calling scanner.next()");
297      result = scanner.next();
298      if (result == null) {
299        break;
300      } else {
301        byte[] expectedResult = expectedResults[i++];
302        expectRow(expectedResult, result);
303      }
304    }
305
306    // ensure we verified all rows. this along with the expectRow check above
307    // proves that we didn't miss any rows.
308    assertEquals("Expected to exhaust expectedResults array length=" + expectedResults.length
309      + ", actual index=" + i, expectedResults.length, i);
310
311    // expect all but the first row (which came from initial openScanner) to have thrown an error
312    expectNumTries(expectedResults.length - 1);
313
314  }
315
316  private void expectTimeoutOnNext(int timeout, Supplier<ResultScanner> scannerSupplier)
317    throws IOException {
318    RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1;
319    RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
320
321    LOG.info(
322      "Opening scanner, expecting no timeouts from first next() call from openScanner response");
323    ResultScanner scanner = scannerSupplier.get();
324    Result result = scanner.next();
325    expectRow(ROW0, result);
326
327    LOG.info("Making first next() RPC, expecting no timeout for seqNo 0");
328    result = scanner.next();
329    expectRow(ROW1, result);
330
331    LOG.info("Making second next() RPC, expecting timeout");
332    long start = System.nanoTime();
333    try {
334      scanner.next();
335      fail("Expected CallTimeoutException");
336    } catch (RetriesExhaustedException e) {
337      assertTrue("Expected CallTimeoutException", e.getCause() instanceof CallTimeoutException
338        || e.getCause() instanceof SocketTimeoutException);
339    }
340    expectTimeout(start, timeout);
341  }
342
343  private void expectTimeoutOnOpenScanner(int timeout, Supplier<ResultScanner> scannerSupplier)
344    throws IOException {
345    RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
346    RSRpcServicesWithScanTimeout.sleepOnOpen = true;
347    LOG.info("Opening scanner, expecting timeout from first next() call from openScanner response");
348    long start = System.nanoTime();
349    try {
350      scannerSupplier.get().next();
351      fail("Expected SocketTimeoutException or CallTimeoutException");
352    } catch (RetriesExhaustedException e) {
353      LOG.info("Got error", e);
354      assertTrue("Expected SocketTimeoutException or CallTimeoutException, but was " + e.getCause(),
355        e.getCause() instanceof CallTimeoutException
356          || e.getCause() instanceof SocketTimeoutException);
357    }
358    expectTimeout(start, timeout);
359  }
360
361  private void expectTimeout(long start, int timeout) {
362    long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
363    LOG.info("Expected duration >= {}, and got {}", timeout, duration);
364    assertTrue("Expected duration >= " + timeout + ", but was " + duration, duration >= timeout);
365  }
366
367  private ResultScanner getScanner() {
368    return getScanner(CONN);
369  }
370
371  private ResultScanner getScanner(Connection conn) {
372    Scan scan = new Scan();
373    scan.setCaching(1);
374    try {
375      return conn.getTable(tableName).getScanner(scan);
376    } catch (IOException e) {
377      throw new RuntimeException(e);
378    }
379  }
380
381  private ResultScanner getAsyncScanner() {
382    Scan scan = new Scan();
383    scan.setCaching(1);
384    return ASYNC_CONN.getTable(tableName).getScanner(scan);
385  }
386
387  private void putToTable(Table ht, byte[] rowkey) throws IOException {
388    Put put = new Put(rowkey);
389    put.addColumn(FAMILY, QUALIFIER, VALUE);
390    ht.put(put);
391  }
392
393  private static class RegionServerWithScanTimeout
394    extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
395    public RegionServerWithScanTimeout(Configuration conf)
396      throws IOException, InterruptedException {
397      super(conf);
398    }
399
400    @Override
401    protected RSRpcServices createRpcServices() throws IOException {
402      return new RSRpcServicesWithScanTimeout(this);
403    }
404  }
405
406  private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
407    private long tableScannerId;
408
409    private static long seqNoToThrowOn = -1;
410    private static boolean throwAlways = false;
411    private static boolean threw;
412
413    private static long seqNoToSleepOn = -1;
414    private static boolean sleepOnOpen = false;
415    private static volatile boolean slept;
416    private static int tryNumber = 0;
417
418    private static int sleepTime = rpcTimeout + 500;
419
420    public static void setSleepForTimeout(int timeout) {
421      sleepTime = timeout + 500;
422    }
423
424    public static void reset() {
425      setSleepForTimeout(scanTimeout);
426
427      seqNoToSleepOn = -1;
428      seqNoToThrowOn = -1;
429      throwAlways = false;
430      threw = false;
431      sleepOnOpen = false;
432      slept = false;
433      tryNumber = 0;
434    }
435
436    public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException {
437      super(rs);
438    }
439
440    @Override
441    public ScanResponse scan(final RpcController controller, final ScanRequest request)
442      throws ServiceException {
443      if (request.hasScannerId()) {
444        LOG.info("Got request {}", request);
445        ScanResponse scanResponse = super.scan(controller, request);
446        if (tableScannerId != request.getScannerId() || request.getCloseScanner()) {
447          return scanResponse;
448        }
449
450        if (
451          throwAlways
452            || (!threw && request.hasNextCallSeq() && seqNoToThrowOn == request.getNextCallSeq())
453        ) {
454          threw = true;
455          tryNumber++;
456          LOG.info("THROWING exception, tryNumber={}, tableScannerId={}", tryNumber,
457            tableScannerId);
458          throw new ServiceException(new OutOfOrderScannerNextException());
459        }
460
461        if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq()) {
462          try {
463            LOG.info("SLEEPING " + sleepTime);
464            Thread.sleep(sleepTime);
465          } catch (InterruptedException e) {
466          }
467          slept = true;
468          tryNumber++;
469        }
470        return scanResponse;
471      } else {
472        ScanResponse scanRes = super.scan(controller, request);
473        String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
474        if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) {
475          tableScannerId = scanRes.getScannerId();
476          if (sleepOnOpen) {
477            try {
478              LOG.info("openScanner SLEEPING " + sleepTime);
479              Thread.sleep(sleepTime);
480            } catch (InterruptedException e) {
481            }
482          }
483        }
484        return scanRes;
485      }
486    }
487  }
488}