001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.locks.Lock;
026import java.util.concurrent.locks.ReentrantLock;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.CompatibilityFactory;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.ResultScanner;
038import org.apache.hadoop.hbase.client.RetriesExhaustedException;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.exceptions.ScannerResetException;
042import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
043import org.apache.hadoop.hbase.ipc.CallTimeoutException;
044import org.apache.hadoop.hbase.test.MetricsAssertHelper;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.testclassification.RegionServerTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.junit.AfterClass;
049import org.junit.BeforeClass;
050import org.junit.ClassRule;
051import org.junit.Rule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054import org.junit.rules.TestName;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
059import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
062
063@Category({ RegionServerTests.class, MediumTests.class })
064public class TestScannerTimeoutHandling {
065
066  private static final Logger LOG = LoggerFactory.getLogger(TestScannerTimeoutHandling.class);
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070    HBaseClassTestRule.forClass(TestScannerTimeoutHandling.class);
071
072  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
073  private static final MetricsAssertHelper METRICS_ASSERT =
074    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
075  private static final int TIMEOUT = 3000;
076  private static final TableName TABLE_NAME = TableName.valueOf("foo");
077  private static Connection CONN;
078
079  @Rule
080  public TestName name = new TestName();
081
082  @BeforeClass
083  public static void setUpBeforeClass() throws Exception {
084    Configuration conf = TEST_UTIL.getConfiguration();
085    // Don't report so often so easier to see other rpcs
086    conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
087    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT);
088    // conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT);
089    conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
090    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
091    TEST_UTIL.startMiniCluster(1);
092    TEST_UTIL.createTable(TABLE_NAME, "0");
093
094    CONN = ConnectionFactory.createConnection(conf);
095  }
096
097  @AfterClass
098  public static void tearDownAfterClass() throws Exception {
099    CONN.close();
100    TEST_UTIL.shutdownMiniCluster();
101  }
102
103  /**
104   * If a client's timeout would be exceeded before scan results are ready, there is no point
105   * returning results to the client. Worse, for openScanner calls, the client cannot close the
106   * timed out scanner so this leaks scanners on the server. This test verifies that we properly
107   * track and cleanup scanners when a client timeout is exceeded. This should be more rare when
108   * heartbeats are enabled, since we try to return before timeout there. But it's still possible if
109   * queueing eats up most of the timeout or the inner workings of the scan were slowed down enough
110   * to still exceed the timeout despite the calculated heartbeat deadline.
111   */
112  @Test
113  public void testExceededClientDeadline() throws Exception {
114    Table table = CONN.getTable(TABLE_NAME);
115
116    // put some rows so that our scanner doesn't complete on the first rpc.
117    // this would prematurely close the scanner before the timeout handling has a chance to
118    for (int i = 0; i < 10; i++) {
119      table.put(new Put(Bytes.toBytes(i)).addColumn(new byte[] { '0' }, new byte[] { '0' },
120        new byte[] { '0' }));
121    }
122
123    try {
124      ResultScanner scanner = table.getScanner(new Scan().setCaching(1).setMaxResultSize(1));
125      scanner.next();
126    } catch (RetriesExhaustedException e) {
127      assertTrue(e.getCause() instanceof CallTimeoutException);
128    } finally {
129      // ensure the scan has finished on the server side
130      RSRpcServicesWithScanTimeout.lock.tryLock(60, TimeUnit.SECONDS);
131      // there should be 0 running scanners, meaning the scanner was properly closed on the server
132      assertEquals(0, RSRpcServicesWithScanTimeout.scannerCount);
133      // we should have caught the expected exception
134      assertTrue(RSRpcServicesWithScanTimeout.caughtTimeoutException);
135      // we should have incremented the callTimedOut metric
136      METRICS_ASSERT.assertCounterGt("exceptions.callTimedOut", 0, TEST_UTIL.getHBaseCluster()
137        .getRegionServer(0).getRpcServer().getMetrics().getMetricsSource());
138    }
139  }
140
141  private static class RegionServerWithScanTimeout
142    extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
143    public RegionServerWithScanTimeout(Configuration conf)
144      throws IOException, InterruptedException {
145      super(conf);
146    }
147
148    @Override
149    protected RSRpcServices createRpcServices() throws IOException {
150      return new RSRpcServicesWithScanTimeout(this);
151    }
152  }
153
154  private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
155
156    private static boolean caughtTimeoutException = false;
157    private static int scannerCount = -1;
158    private static final Lock lock = new ReentrantLock();
159
160    public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException {
161      super(rs);
162    }
163
164    @Override
165    public ClientProtos.ScanResponse scan(final RpcController controller,
166      final ClientProtos.ScanRequest request) throws ServiceException {
167
168      String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
169      if (regionName.contains(TABLE_NAME.getNameAsString())) {
170
171        // if the client's timeout is exceeded, it may either retry or attempt to close
172        // the scanner. we don't want to allow either until we've verified the server handling.
173        // so only allow 1 request at a time to our test table
174        try {
175          if (!lock.tryLock(60, TimeUnit.SECONDS)) {
176            throw new ServiceException("Failed to get lock");
177          }
178        } catch (InterruptedException e) {
179          throw new ServiceException(e);
180        }
181
182        try {
183          LOG.info("SLEEPING");
184          Thread.sleep(TIMEOUT * 2);
185        } catch (Exception e) {
186        }
187
188        try {
189          return super.scan(controller, request);
190        } catch (ServiceException e) {
191          if (
192            e.getCause() instanceof ScannerResetException
193              && e.getCause().getCause() instanceof TimeoutIOException
194          ) {
195            LOG.info("caught EXPECTED exception in scan after sleep", e);
196            caughtTimeoutException = true;
197          } else {
198            LOG.warn("caught UNEXPECTED exception in scan after sleep", e);
199          }
200        } finally {
201          scannerCount = getScannersCount();
202          lock.unlock();
203        }
204      }
205
206      return super.scan(controller, request);
207
208    }
209  }
210}