001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.locks.Lock; 026import java.util.concurrent.locks.ReentrantLock; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.CompatibilityFactory; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.MiniHBaseCluster; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.ResultScanner; 038import org.apache.hadoop.hbase.client.RetriesExhaustedException; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.exceptions.ScannerResetException; 042import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 043import org.apache.hadoop.hbase.ipc.CallTimeoutException; 044import org.apache.hadoop.hbase.test.MetricsAssertHelper; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.RegionServerTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.junit.AfterClass; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Rule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.junit.rules.TestName; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 059import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 060 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 062 063@Category({ RegionServerTests.class, MediumTests.class }) 064public class TestScannerTimeoutHandling { 065 066 private static final Logger LOG = LoggerFactory.getLogger(TestScannerTimeoutHandling.class); 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestScannerTimeoutHandling.class); 071 072 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 073 private static final MetricsAssertHelper METRICS_ASSERT = 074 CompatibilityFactory.getInstance(MetricsAssertHelper.class); 075 private static final int TIMEOUT = 3000; 076 private static final TableName TABLE_NAME = TableName.valueOf("foo"); 077 private static Connection CONN; 078 079 @Rule 080 public TestName name = new TestName(); 081 082 @BeforeClass 083 public static void setUpBeforeClass() throws Exception { 084 Configuration conf = TEST_UTIL.getConfiguration(); 085 // Don't report so often so easier to see other rpcs 086 conf.setInt("hbase.regionserver.msginterval", 3 * 10000); 087 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT); 088 // conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT); 089 conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); 090 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 091 TEST_UTIL.startMiniCluster(1); 092 TEST_UTIL.createTable(TABLE_NAME, "0"); 093 094 CONN = ConnectionFactory.createConnection(conf); 095 } 096 097 @AfterClass 098 public static void tearDownAfterClass() throws Exception { 099 CONN.close(); 100 TEST_UTIL.shutdownMiniCluster(); 101 } 102 103 /** 104 * If a client's timeout would be exceeded before scan results are ready, there is no point 105 * returning results to the client. Worse, for openScanner calls, the client cannot close the 106 * timed out scanner so this leaks scanners on the server. This test verifies that we properly 107 * track and cleanup scanners when a client timeout is exceeded. This should be more rare when 108 * heartbeats are enabled, since we try to return before timeout there. But it's still possible if 109 * queueing eats up most of the timeout or the inner workings of the scan were slowed down enough 110 * to still exceed the timeout despite the calculated heartbeat deadline. 111 */ 112 @Test 113 public void testExceededClientDeadline() throws Exception { 114 Table table = CONN.getTable(TABLE_NAME); 115 116 // put some rows so that our scanner doesn't complete on the first rpc. 117 // this would prematurely close the scanner before the timeout handling has a chance to 118 for (int i = 0; i < 10; i++) { 119 table.put(new Put(Bytes.toBytes(i)).addColumn(new byte[] { '0' }, new byte[] { '0' }, 120 new byte[] { '0' })); 121 } 122 123 try { 124 ResultScanner scanner = table.getScanner(new Scan().setCaching(1).setMaxResultSize(1)); 125 scanner.next(); 126 } catch (RetriesExhaustedException e) { 127 assertTrue(e.getCause() instanceof CallTimeoutException); 128 } finally { 129 // ensure the scan has finished on the server side 130 RSRpcServicesWithScanTimeout.lock.tryLock(60, TimeUnit.SECONDS); 131 // there should be 0 running scanners, meaning the scanner was properly closed on the server 132 assertEquals(0, RSRpcServicesWithScanTimeout.scannerCount); 133 // we should have caught the expected exception 134 assertTrue(RSRpcServicesWithScanTimeout.caughtTimeoutException); 135 // we should have incremented the callTimedOut metric 136 METRICS_ASSERT.assertCounterGt("exceptions.callTimedOut", 0, TEST_UTIL.getHBaseCluster() 137 .getRegionServer(0).getRpcServer().getMetrics().getMetricsSource()); 138 } 139 } 140 141 private static class RegionServerWithScanTimeout 142 extends MiniHBaseCluster.MiniHBaseClusterRegionServer { 143 public RegionServerWithScanTimeout(Configuration conf) 144 throws IOException, InterruptedException { 145 super(conf); 146 } 147 148 @Override 149 protected RSRpcServices createRpcServices() throws IOException { 150 return new RSRpcServicesWithScanTimeout(this); 151 } 152 } 153 154 private static class RSRpcServicesWithScanTimeout extends RSRpcServices { 155 156 private static boolean caughtTimeoutException = false; 157 private static int scannerCount = -1; 158 private static final Lock lock = new ReentrantLock(); 159 160 public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException { 161 super(rs); 162 } 163 164 @Override 165 public ClientProtos.ScanResponse scan(final RpcController controller, 166 final ClientProtos.ScanRequest request) throws ServiceException { 167 168 String regionName = Bytes.toString(request.getRegion().getValue().toByteArray()); 169 if (regionName.contains(TABLE_NAME.getNameAsString())) { 170 171 // if the client's timeout is exceeded, it may either retry or attempt to close 172 // the scanner. we don't want to allow either until we've verified the server handling. 173 // so only allow 1 request at a time to our test table 174 try { 175 if (!lock.tryLock(60, TimeUnit.SECONDS)) { 176 throw new ServiceException("Failed to get lock"); 177 } 178 } catch (InterruptedException e) { 179 throw new ServiceException(e); 180 } 181 182 try { 183 LOG.info("SLEEPING"); 184 Thread.sleep(TIMEOUT * 2); 185 } catch (Exception e) { 186 } 187 188 try { 189 return super.scan(controller, request); 190 } catch (ServiceException e) { 191 if ( 192 e.getCause() instanceof ScannerResetException 193 && e.getCause().getCause() instanceof TimeoutIOException 194 ) { 195 LOG.info("caught EXPECTED exception in scan after sleep", e); 196 caughtTimeoutException = true; 197 } else { 198 LOG.warn("caught UNEXPECTED exception in scan after sleep", e); 199 } 200 } finally { 201 scannerCount = getScannersCount(); 202 lock.unlock(); 203 } 204 } 205 206 return super.scan(controller, request); 207 208 } 209 } 210}