001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertNull; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.Arrays; 025import java.util.Optional; 026import java.util.concurrent.atomic.AtomicReference; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Result; 038import org.apache.hadoop.hbase.client.ResultScanner; 039import org.apache.hadoop.hbase.client.Scan; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.coprocessor.ObserverContext; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 044import org.apache.hadoop.hbase.coprocessor.RegionObserver; 045import org.apache.hadoop.hbase.ipc.RpcServer; 046import org.apache.hadoop.hbase.ipc.ServerCall; 047import org.apache.hadoop.hbase.testclassification.MediumTests; 048import org.apache.hadoop.hbase.testclassification.RegionServerTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.junit.AfterClass; 051import org.junit.BeforeClass; 052import org.junit.ClassRule; 053import org.junit.Rule; 054import org.junit.Test; 055import org.junit.experimental.categories.Category; 056import org.junit.rules.TestName; 057 058import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 059import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 060 061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 067 068@Category({ RegionServerTests.class, MediumTests.class }) 069public class TestShortCircuitGet { 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestShortCircuitGet.class); 073 074 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 075 private static final byte[] FAMILY = Bytes.toBytes("testFamily"); 076 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 077 private static final byte[] VALUE = Bytes.toBytes("testValue"); 078 079 static final byte[] r0 = Bytes.toBytes("row-0"); 080 static final byte[] r1 = Bytes.toBytes("row-1"); 081 static final byte[] r2 = Bytes.toBytes("row-2"); 082 static final byte[] r3 = Bytes.toBytes("row-3"); 083 static final byte[] r4 = Bytes.toBytes("row-4"); 084 static final byte[] r5 = Bytes.toBytes("row-5"); 085 static final byte[] r6 = Bytes.toBytes("row-6"); 086 static final TableName tableName = TableName.valueOf("TestShortCircuitGet"); 087 088 @Rule 089 public TestName name = new TestName(); 090 091 @BeforeClass 092 public static void setUpBeforeClass() throws Exception { 093 Configuration conf = TEST_UTIL.getConfiguration(); 094 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000); 095 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000); 096 097 conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000); 098 conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); 099 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 100 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000); 101 TEST_UTIL.startMiniCluster(1); 102 } 103 104 @AfterClass 105 public static void tearDownAfterClass() throws Exception { 106 TEST_UTIL.shutdownMiniCluster(); 107 } 108 109 /** 110 * This test is for HBASE-26821,when we initiate get or scan in cp, the {@link RegionScanner} for 111 * get and scan is close when get or scan is completed. 112 */ 113 @Test 114 public void testScannerCloseWhenScanAndGetInCP() throws Exception { 115 Table table = null; 116 try { 117 table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 118 HConstants.DEFAULT_BLOCKSIZE, MyScanObserver.class.getName()); 119 putToTable(table, r0); 120 putToTable(table, r1); 121 putToTable(table, r2); 122 putToTable(table, r3); 123 putToTable(table, r4); 124 putToTable(table, r5); 125 putToTable(table, r6); 126 } finally { 127 if (table != null) { 128 table.close(); 129 } 130 } 131 132 final Configuration conf = TEST_UTIL.getConfiguration(); 133 ResultScanner resultScanner = null; 134 Connection conn = null; 135 Table clientTable = null; 136 try { 137 conn = ConnectionFactory.createConnection(conf); 138 clientTable = conn.getTable(tableName); 139 Scan scan = new Scan(); 140 scan.setCaching(1); 141 scan.withStartRow(r0, true).withStopRow(r1, true); 142 resultScanner = table.getScanner(scan); 143 Result result = resultScanner.next(); 144 assertTrue("Expected row: row-0", Bytes.equals(r0, result.getRow())); 145 result = resultScanner.next(); 146 assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow())); 147 assertNull(resultScanner.next()); 148 } finally { 149 if (resultScanner != null) { 150 resultScanner.close(); 151 } 152 if (clientTable != null) { 153 clientTable.close(); 154 } 155 if (conn != null) { 156 conn.close(); 157 } 158 } 159 160 assertTrue(MyRSRpcServices.exceptionRef.get() == null); 161 assertTrue(MyScanObserver.exceptionRef.get() == null); 162 } 163 164 private void putToTable(Table ht, byte[] rowkey) throws IOException { 165 Put put = new Put(rowkey); 166 put.addColumn(FAMILY, QUALIFIER, VALUE); 167 ht.put(put); 168 } 169 170 private static class MyRegionServer extends MiniHBaseClusterRegionServer { 171 public MyRegionServer(Configuration conf) throws IOException, InterruptedException { 172 super(conf); 173 } 174 175 @Override 176 protected RSRpcServices createRpcServices() throws IOException { 177 return new MyRSRpcServices(this); 178 } 179 } 180 181 private static class MyRSRpcServices extends RSRpcServices { 182 private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null); 183 184 public MyRSRpcServices(HRegionServer rs) throws IOException { 185 super(rs); 186 } 187 188 @Override 189 public MultiResponse multi(RpcController rpcc, MultiRequest request) throws ServiceException { 190 try { 191 if (!MyScanObserver.inCP) { 192 return super.multi(rpcc, request); 193 } 194 195 assertTrue(!RpcServer.getCurrentCall().isPresent()); 196 return super.multi(rpcc, request); 197 } catch (Throwable e) { 198 exceptionRef.set(e); 199 throw new ServiceException(e); 200 } 201 } 202 203 @Override 204 public ScanResponse scan(RpcController controller, ScanRequest request) 205 throws ServiceException { 206 try { 207 if (!MyScanObserver.inCP) { 208 return super.scan(controller, request); 209 } 210 211 HRegion region = null; 212 if (request.hasRegion()) { 213 region = this.getRegion(request.getRegion()); 214 } 215 216 if ( 217 region != null && TableName.isMetaTableName(region.getTableDescriptor().getTableName()) 218 ) { 219 return super.scan(controller, request); 220 } 221 222 assertTrue(!RpcServer.getCurrentCall().isPresent()); 223 return super.scan(controller, request); 224 } catch (Throwable e) { 225 exceptionRef.set(e); 226 throw new ServiceException(e); 227 } 228 } 229 230 @Override 231 public GetResponse get(RpcController controller, GetRequest request) throws ServiceException { 232 try { 233 if (!MyScanObserver.inCP) { 234 return super.get(controller, request); 235 } 236 237 HRegion region = null; 238 if (request.hasRegion()) { 239 region = this.getRegion(request.getRegion()); 240 } 241 if ( 242 region != null && TableName.isMetaTableName(region.getTableDescriptor().getTableName()) 243 ) { 244 return super.get(controller, request); 245 } 246 247 assertTrue(!RpcServer.getCurrentCall().isPresent()); 248 return super.get(controller, request); 249 } catch (Throwable e) { 250 exceptionRef.set(e); 251 throw new ServiceException(e); 252 } 253 } 254 } 255 256 public static class MyScanObserver implements RegionCoprocessor, RegionObserver { 257 258 private static volatile boolean inCP = false; 259 private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null); 260 261 @Override 262 public Optional<RegionObserver> getRegionObserver() { 263 return Optional.of(this); 264 } 265 266 @SuppressWarnings("rawtypes") 267 @Override 268 public RegionScanner postScannerOpen( 269 final ObserverContext<RegionCoprocessorEnvironment> observerContext, final Scan scan, 270 final RegionScanner regionScanner) throws IOException { 271 272 if (inCP) { 273 return regionScanner; 274 } 275 276 HRegion region = (HRegion) observerContext.getEnvironment().getRegion(); 277 int prevScannerCount = region.scannerReadPoints.size(); 278 Table table1 = null; 279 Get get2 = new Get(r2); 280 inCP = true; 281 try (Connection connection = observerContext.getEnvironment() 282 .createConnection(observerContext.getEnvironment().getConfiguration())) { 283 try { 284 table1 = connection.getTable(tableName); 285 Result result = table1.get(get2); 286 assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow())); 287 288 } finally { 289 if (table1 != null) { 290 table1.close(); 291 } 292 inCP = false; 293 } 294 295 // RegionScanner is closed and there is no rpcCallBack set 296 assertTrue(prevScannerCount == region.scannerReadPoints.size()); 297 ServerCall serverCall = (ServerCall) RpcServer.getCurrentCall().get(); 298 assertTrue(serverCall.getCallBack() == null); 299 300 Get get3 = new Get(r3); 301 Get get4 = new Get(r4); 302 Table table2 = null; 303 inCP = true; 304 try { 305 table2 = connection.getTable(tableName); 306 Result[] results = table2.get(Arrays.asList(get3, get4)); 307 assertTrue("Expected row: row-3", Bytes.equals(r3, results[0].getRow())); 308 assertTrue("Expected row: row-4", Bytes.equals(r4, results[1].getRow())); 309 } finally { 310 if (table2 != null) { 311 table2.close(); 312 } 313 inCP = false; 314 } 315 316 // RegionScanner is closed and there is no rpcCallBack set 317 assertTrue(prevScannerCount == region.scannerReadPoints.size()); 318 serverCall = (ServerCall) RpcServer.getCurrentCall().get(); 319 assertTrue(serverCall.getCallBack() == null); 320 321 Scan newScan = new Scan(); 322 newScan.setCaching(1); 323 newScan.withStartRow(r5, true).withStopRow(r6, true); 324 Table table3 = null; 325 ResultScanner resultScanner = null; 326 inCP = true; 327 try { 328 table3 = connection.getTable(tableName); 329 resultScanner = table3.getScanner(newScan); 330 Result result = resultScanner.next(); 331 assertTrue("Expected row: row-5", Bytes.equals(r5, result.getRow())); 332 result = resultScanner.next(); 333 assertTrue("Expected row: row-6", Bytes.equals(r6, result.getRow())); 334 result = resultScanner.next(); 335 assertNull(result); 336 } finally { 337 if (resultScanner != null) { 338 resultScanner.close(); 339 } 340 if (table3 != null) { 341 table3.close(); 342 } 343 inCP = false; 344 } 345 346 // RegionScanner is closed and there is no rpcCallBack set 347 assertTrue(prevScannerCount == region.scannerReadPoints.size()); 348 serverCall = (ServerCall) RpcServer.getCurrentCall().get(); 349 assertTrue(serverCall.getCallBack() == null); 350 return regionScanner; 351 } catch (Throwable e) { 352 exceptionRef.set(e); 353 throw e; 354 } 355 } 356 } 357 358}