001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertNull; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicReference; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseTestingUtil; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Result; 034import org.apache.hadoop.hbase.client.ResultScanner; 035import org.apache.hadoop.hbase.client.Scan; 036import org.apache.hadoop.hbase.client.Table; 037import org.apache.hadoop.hbase.io.ByteBuffAllocator; 038import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator; 039import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; 040import org.apache.hadoop.hbase.ipc.RpcCall; 041import org.apache.hadoop.hbase.ipc.RpcServer; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.RegionServerTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.junit.jupiter.api.AfterAll; 046import org.junit.jupiter.api.BeforeAll; 047import org.junit.jupiter.api.Tag; 048import org.junit.jupiter.api.Test; 049 050import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 051import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 055 056@Tag(RegionServerTests.TAG) 057@Tag(LargeTests.TAG) 058public class TestRegionServerScan { 059 060 private static final byte[] CF = Bytes.toBytes("CF"); 061 private static final byte[] CQ = Bytes.toBytes("CQ"); 062 private static final byte[] VALUE = new byte[1200]; 063 064 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 065 private static final Configuration conf = TEST_UTIL.getConfiguration(); 066 private static Admin admin = null; 067 static final TableName tableName = TableName.valueOf("TestRegionServerScan"); 068 static final byte[] r0 = Bytes.toBytes("row-0"); 069 static final byte[] r1 = Bytes.toBytes("row-1"); 070 static final byte[] r2 = Bytes.toBytes("row-2"); 071 072 @BeforeAll 073 public static void setupBeforeClass() throws Exception { 074 /** 075 * Use {@link DeallocateRewriteByteBuffAllocator} to rewrite the bytebuffers right after 076 * released. 077 */ 078 conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS, 079 DeallocateRewriteByteBuffAllocator.class.getName()); 080 conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); 081 conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0); 082 conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20); 083 conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 2048); 084 conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); 085 conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64); 086 conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); 087 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000); 088 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000); 089 090 conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000); 091 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 092 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000); 093 conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1024 * 1024 * 1024); 094 TEST_UTIL.startMiniCluster(1); 095 admin = TEST_UTIL.getAdmin(); 096 } 097 098 @AfterAll 099 public static void tearDownAfterClass() throws Exception { 100 TEST_UTIL.shutdownMiniCluster(); 101 } 102 103 @Test 104 public void testScannWhenRpcCallContextNull() throws Exception { 105 ResultScanner resultScanner = null; 106 Table table = null; 107 try { 108 table = TEST_UTIL.createTable(tableName, new byte[][] { CF }, 1, 1024, null); 109 putToTable(table, r0); 110 putToTable(table, r1); 111 putToTable(table, r2); 112 113 admin.flush(table.getName()); 114 115 Scan scan = new Scan(); 116 scan.setCaching(2); 117 scan.withStartRow(r0, true).withStopRow(r2, true); 118 119 MyRSRpcServices.inTest = true; 120 resultScanner = table.getScanner(scan); 121 Result result = resultScanner.next(); 122 byte[] rowKey = result.getRow(); 123 assertTrue(Bytes.equals(r0, rowKey)); 124 125 result = resultScanner.next(); 126 rowKey = result.getRow(); 127 assertTrue(Bytes.equals(r1, rowKey)); 128 129 result = resultScanner.next(); 130 rowKey = result.getRow(); 131 assertTrue(Bytes.equals(r2, rowKey)); 132 assertNull(resultScanner.next()); 133 assertTrue(MyRSRpcServices.exceptionRef.get() == null); 134 } finally { 135 MyRSRpcServices.inTest = false; 136 if (resultScanner != null) { 137 resultScanner.close(); 138 } 139 if (table != null) { 140 table.close(); 141 } 142 } 143 } 144 145 private static void putToTable(Table table, byte[] rowkey) throws IOException { 146 Put put = new Put(rowkey); 147 put.addColumn(CF, CQ, VALUE); 148 table.put(put); 149 } 150 151 private static class MyRegionServer extends MiniHBaseClusterRegionServer { 152 public MyRegionServer(Configuration conf) throws IOException, InterruptedException { 153 super(conf); 154 } 155 156 @Override 157 protected RSRpcServices createRpcServices() throws IOException { 158 return new MyRSRpcServices(this); 159 } 160 } 161 162 private static class MyRSRpcServices extends RSRpcServices { 163 private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null); 164 private static volatile boolean inTest = false; 165 166 public MyRSRpcServices(HRegionServer rs) throws IOException { 167 super(rs); 168 } 169 170 @Override 171 public ScanResponse scan(RpcController controller, ScanRequest request) 172 throws ServiceException { 173 try { 174 if (!inTest) { 175 return super.scan(controller, request); 176 } 177 178 HRegion region = null; 179 if (request.hasRegion()) { 180 region = this.getRegion(request.getRegion()); 181 } 182 183 if (region != null && !tableName.equals(region.getTableDescriptor().getTableName())) { 184 return super.scan(controller, request); 185 } 186 187 ScanResponse result = null; 188 // Simulate RpcCallContext is null for test. 189 Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall(); 190 try { 191 result = super.scan(controller, request); 192 } finally { 193 rpcCall.ifPresent(RpcServer::setCurrentCall); 194 } 195 return result; 196 } catch (Throwable e) { 197 exceptionRef.set(e); 198 throw new ServiceException(e); 199 } 200 } 201 } 202 203}