001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.locks.Lock; 026import java.util.concurrent.locks.ReentrantLock; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.CompatibilityFactory; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.ResultScanner; 037import org.apache.hadoop.hbase.client.RetriesExhaustedException; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.exceptions.ScannerResetException; 041import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 042import org.apache.hadoop.hbase.ipc.CallTimeoutException; 043import org.apache.hadoop.hbase.test.MetricsAssertHelper; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.testclassification.RegionServerTests; 046import org.apache.hadoop.hbase.util.Bytes; 047import org.junit.jupiter.api.AfterAll; 048import org.junit.jupiter.api.BeforeAll; 049import org.junit.jupiter.api.Tag; 050import org.junit.jupiter.api.Test; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 055import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 056 057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 058 059@Tag(RegionServerTests.TAG) 060@Tag(MediumTests.TAG) 061public class TestScannerTimeoutHandling { 062 063 private static final Logger LOG = LoggerFactory.getLogger(TestScannerTimeoutHandling.class); 064 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 065 private static final MetricsAssertHelper METRICS_ASSERT = 066 CompatibilityFactory.getInstance(MetricsAssertHelper.class); 067 private static final int TIMEOUT = 3000; 068 private static final TableName TABLE_NAME = TableName.valueOf("foo"); 069 private static Connection CONN; 070 071 @BeforeAll 072 public static void setUpBeforeClass() throws Exception { 073 Configuration conf = TEST_UTIL.getConfiguration(); 074 // Don't report so often so easier to see other rpcs 075 conf.setInt("hbase.regionserver.msginterval", 3 * 10000); 076 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, TIMEOUT); 077 // conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, TIMEOUT); 078 conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); 079 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); 080 TEST_UTIL.startMiniCluster(1); 081 TEST_UTIL.createTable(TABLE_NAME, "0"); 082 083 CONN = ConnectionFactory.createConnection(conf); 084 } 085 086 @AfterAll 087 public static void tearDownAfterClass() throws Exception { 088 CONN.close(); 089 TEST_UTIL.shutdownMiniCluster(); 090 } 091 092 /** 093 * If a client's timeout would be exceeded before scan results are ready, there is no point 094 * returning results to the client. Worse, for openScanner calls, the client cannot close the 095 * timed out scanner so this leaks scanners on the server. This test verifies that we properly 096 * track and cleanup scanners when a client timeout is exceeded. This should be more rare when 097 * heartbeats are enabled, since we try to return before timeout there. But it's still possible if 098 * queueing eats up most of the timeout or the inner workings of the scan were slowed down enough 099 * to still exceed the timeout despite the calculated heartbeat deadline. 100 */ 101 @Test 102 public void testExceededClientDeadline() throws Exception { 103 Table table = CONN.getTable(TABLE_NAME); 104 105 // put some rows so that our scanner doesn't complete on the first rpc. 106 // this would prematurely close the scanner before the timeout handling has a chance to 107 for (int i = 0; i < 10; i++) { 108 table.put(new Put(Bytes.toBytes(i)).addColumn(new byte[] { '0' }, new byte[] { '0' }, 109 new byte[] { '0' })); 110 } 111 112 try { 113 ResultScanner scanner = table.getScanner(new Scan().setCaching(1).setMaxResultSize(1)); 114 scanner.next(); 115 } catch (RetriesExhaustedException e) { 116 assertTrue(e.getCause() instanceof CallTimeoutException); 117 } finally { 118 // ensure the scan has finished on the server side 119 RSRpcServicesWithScanTimeout.lock.tryLock(60, TimeUnit.SECONDS); 120 // there should be 0 running scanners, meaning the scanner was properly closed on the server 121 assertEquals(0, RSRpcServicesWithScanTimeout.scannerCount); 122 // we should have caught the expected exception 123 assertTrue(RSRpcServicesWithScanTimeout.caughtTimeoutException); 124 // we should have incremented the callTimedOut metric 125 METRICS_ASSERT.assertCounterGt("exceptions.callTimedOut", 0, TEST_UTIL.getHBaseCluster() 126 .getRegionServer(0).getRpcServer().getMetrics().getMetricsSource()); 127 } 128 } 129 130 private static class RegionServerWithScanTimeout 131 extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer { 132 public RegionServerWithScanTimeout(Configuration conf) 133 throws IOException, InterruptedException { 134 super(conf); 135 } 136 137 @Override 138 protected RSRpcServices createRpcServices() throws IOException { 139 return new RSRpcServicesWithScanTimeout(this); 140 } 141 } 142 143 private static class RSRpcServicesWithScanTimeout extends RSRpcServices { 144 145 private static boolean caughtTimeoutException = false; 146 private static int scannerCount = -1; 147 private static final Lock lock = new ReentrantLock(); 148 149 public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException { 150 super(rs); 151 } 152 153 @Override 154 public ClientProtos.ScanResponse scan(final RpcController controller, 155 final ClientProtos.ScanRequest request) throws ServiceException { 156 157 String regionName = Bytes.toString(request.getRegion().getValue().toByteArray()); 158 if (regionName.contains(TABLE_NAME.getNameAsString())) { 159 160 // if the client's timeout is exceeded, it may either retry or attempt to close 161 // the scanner. we don't want to allow either until we've verified the server handling. 162 // so only allow 1 request at a time to our test table 163 try { 164 if (!lock.tryLock(60, TimeUnit.SECONDS)) { 165 throw new ServiceException("Failed to get lock"); 166 } 167 } catch (InterruptedException e) { 168 throw new ServiceException(e); 169 } 170 171 try { 172 LOG.info("SLEEPING"); 173 Thread.sleep(TIMEOUT * 2); 174 } catch (Exception e) { 175 } 176 177 try { 178 return super.scan(controller, request); 179 } catch (ServiceException e) { 180 if ( 181 e.getCause() instanceof ScannerResetException 182 && e.getCause().getCause() instanceof TimeoutIOException 183 ) { 184 LOG.info("caught EXPECTED exception in scan after sleep", e); 185 caughtTimeoutException = true; 186 } else { 187 LOG.warn("caught UNEXPECTED exception in scan after sleep", e); 188 } 189 } finally { 190 scannerCount = getScannersCount(); 191 lock.unlock(); 192 } 193 } 194 195 return super.scan(controller, request); 196 197 } 198 } 199}