001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.locks.Lock;
026import java.util.concurrent.locks.ReentrantLock;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.CompatibilityFactory;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.Put;
036import org.apache.hadoop.hbase.client.ResultScanner;
037import org.apache.hadoop.hbase.client.RetriesExhaustedException;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.exceptions.ScannerResetException;
041import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
042import org.apache.hadoop.hbase.ipc.CallTimeoutException;
043import org.apache.hadoop.hbase.test.MetricsAssertHelper;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.testclassification.RegionServerTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.junit.jupiter.api.AfterAll;
048import org.junit.jupiter.api.BeforeAll;
049import org.junit.jupiter.api.Tag;
050import org.junit.jupiter.api.Test;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
055import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
056
057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
058
059@Tag(RegionServerTests.TAG)
060@Tag(MediumTests.TAG)
061public class TestScannerTimeoutHandling {
062
063  private static final Logger LOG = LoggerFactory.getLogger(TestScannerTimeoutHandling.class);
064  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
065  private static final MetricsAssertHelper METRICS_ASSERT =
066    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
067  private static final int TIMEOUT = 3000;
068  private static final TableName TABLE_NAME = TableName.valueOf("foo");
069  private static Connection CONN;
070
071  @BeforeAll
072  public static void setUpBeforeClass() throws Exception {
073    Configuration conf = TEST_UTIL.getConfiguration();
074    // Don't report so often so easier to see other rpcs
075    conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
076    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT);
077    // conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT);
078    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
079    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
080    TEST_UTIL.startMiniCluster(1);
081    TEST_UTIL.createTable(TABLE_NAME, "0");
082
083    CONN = ConnectionFactory.createConnection(conf);
084  }
085
086  @AfterAll
087  public static void tearDownAfterClass() throws Exception {
088    CONN.close();
089    TEST_UTIL.shutdownMiniCluster();
090  }
091
092  /**
093   * If a client's timeout would be exceeded before scan results are ready, there is no point
094   * returning results to the client. Worse, for openScanner calls, the client cannot close the
095   * timed out scanner so this leaks scanners on the server. This test verifies that we properly
096   * track and cleanup scanners when a client timeout is exceeded. This should be more rare when
097   * heartbeats are enabled, since we try to return before timeout there. But it's still possible if
098   * queueing eats up most of the timeout or the inner workings of the scan were slowed down enough
099   * to still exceed the timeout despite the calculated heartbeat deadline.
100   */
101  @Test
102  public void testExceededClientDeadline() throws Exception {
103    Table table = CONN.getTable(TABLE_NAME);
104
105    // put some rows so that our scanner doesn't complete on the first rpc.
106    // this would prematurely close the scanner before the timeout handling has a chance to
107    for (int i = 0; i < 10; i++) {
108      table.put(new Put(Bytes.toBytes(i)).addColumn(new byte[] { '0' }, new byte[] { '0' },
109        new byte[] { '0' }));
110    }
111
112    try {
113      ResultScanner scanner = table.getScanner(new Scan().setCaching(1).setMaxResultSize(1));
114      scanner.next();
115    } catch (RetriesExhaustedException e) {
116      assertTrue(e.getCause() instanceof CallTimeoutException);
117    } finally {
118      // ensure the scan has finished on the server side
119      RSRpcServicesWithScanTimeout.lock.tryLock(60, TimeUnit.SECONDS);
120      // there should be 0 running scanners, meaning the scanner was properly closed on the server
121      assertEquals(0, RSRpcServicesWithScanTimeout.scannerCount);
122      // we should have caught the expected exception
123      assertTrue(RSRpcServicesWithScanTimeout.caughtTimeoutException);
124      // we should have incremented the callTimedOut metric
125      METRICS_ASSERT.assertCounterGt("exceptions.callTimedOut", 0, TEST_UTIL.getHBaseCluster()
126        .getRegionServer(0).getRpcServer().getMetrics().getMetricsSource());
127    }
128  }
129
130  private static class RegionServerWithScanTimeout
131    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
132    public RegionServerWithScanTimeout(Configuration conf)
133      throws IOException, InterruptedException {
134      super(conf);
135    }
136
137    @Override
138    protected RSRpcServices createRpcServices() throws IOException {
139      return new RSRpcServicesWithScanTimeout(this);
140    }
141  }
142
143  private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
144
145    private static boolean caughtTimeoutException = false;
146    private static int scannerCount = -1;
147    private static final Lock lock = new ReentrantLock();
148
149    public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException {
150      super(rs);
151    }
152
153    @Override
154    public ClientProtos.ScanResponse scan(final RpcController controller,
155      final ClientProtos.ScanRequest request) throws ServiceException {
156
157      String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
158      if (regionName.contains(TABLE_NAME.getNameAsString())) {
159
160        // if the client's timeout is exceeded, it may either retry or attempt to close
161        // the scanner. we don't want to allow either until we've verified the server handling.
162        // so only allow 1 request at a time to our test table
163        try {
164          if (!lock.tryLock(60, TimeUnit.SECONDS)) {
165            throw new ServiceException("Failed to get lock");
166          }
167        } catch (InterruptedException e) {
168          throw new ServiceException(e);
169        }
170
171        try {
172          LOG.info("SLEEPING");
173          Thread.sleep(TIMEOUT * 2);
174        } catch (Exception e) {
175        }
176
177        try {
178          return super.scan(controller, request);
179        } catch (ServiceException e) {
180          if (
181            e.getCause() instanceof ScannerResetException
182              && e.getCause().getCause() instanceof TimeoutIOException
183          ) {
184            LOG.info("caught EXPECTED exception in scan after sleep", e);
185            caughtTimeoutException = true;
186          } else {
187            LOG.warn("caught UNEXPECTED exception in scan after sleep", e);
188          }
189        } finally {
190          scannerCount = getScannersCount();
191          lock.unlock();
192        }
193      }
194
195      return super.scan(controller, request);
196
197    }
198  }
199}