001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.HashSet; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CompareOperator; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.RegionLocator; 035import org.apache.hadoop.hbase.client.Result; 036import org.apache.hadoop.hbase.client.ResultScanner; 037import org.apache.hadoop.hbase.client.Scan; 038import org.apache.hadoop.hbase.client.Table; 039import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 040import org.apache.hadoop.hbase.filter.BinaryComparator; 041import org.apache.hadoop.hbase.filter.ColumnPaginationFilter; 042import org.apache.hadoop.hbase.filter.QualifierFilter; 043import org.apache.hadoop.hbase.filter.RowFilter; 044import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; 045import org.apache.hadoop.hbase.filter.SkipFilter; 046import org.apache.hadoop.hbase.testclassification.LargeTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.junit.jupiter.api.BeforeAll; 049import org.junit.jupiter.api.BeforeEach; 050import org.junit.jupiter.api.Tag; 051import org.junit.jupiter.api.Test; 052 053@Tag(LargeTests.TAG) 054public class TestScannerBlockSizeLimits { 055 056 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 057 private static final TableName TABLE = TableName.valueOf("TestScannerBlockSizeLimits"); 058 private static final byte[] FAMILY1 = Bytes.toBytes("0"); 059 private static final byte[] FAMILY2 = Bytes.toBytes("1"); 060 061 private static final byte[] DATA = new byte[1000]; 062 private static final byte[][] FAMILIES = new byte[][] { FAMILY1, FAMILY2 }; 063 064 private static final byte[] COLUMN1 = Bytes.toBytes(0); 065 private static final byte[] COLUMN2 = Bytes.toBytes(1); 066 private static final byte[] COLUMN3 = Bytes.toBytes(2); 067 private static final byte[] COLUMN5 = Bytes.toBytes(5); 068 069 @BeforeAll 070 public static void setUp() throws Exception { 071 Configuration conf = TEST_UTIL.getConfiguration(); 072 conf.setInt(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4200); 073 TEST_UTIL.startMiniCluster(1); 074 TEST_UTIL.createTable(TABLE, FAMILIES, 1, 2048); 075 createTestData(); 076 } 077 078 @BeforeEach 079 public void setupEach() throws Exception { 080 HRegionServer regionServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); 081 for (HRegion region : regionServer.getRegions(TABLE)) { 082 System.out.println("Clearing cache for region " + region.getRegionInfo().getEncodedName()); 083 regionServer.clearRegionBlockCache(region); 084 } 085 } 086 087 private static void createTestData() throws IOException, InterruptedException { 088 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE); 089 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); 090 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(TABLE).getRegion(regionName); 091 092 for (int i = 1; i < 10; i++) { 093 // 5 columns per row, in 2 families 094 // Each column value is 1000 bytes, which is enough to fill a full block with row and header. 095 // So 5 blocks per row in FAMILY1 096 Put put = new Put(Bytes.toBytes(i)); 097 for (int j = 0; j < 6; j++) { 098 put.addColumn(FAMILY1, Bytes.toBytes(j), DATA); 099 } 100 101 // Additional block in FAMILY2 (notably smaller than block size) 102 put.addColumn(FAMILY2, COLUMN1, DATA); 103 104 region.put(put); 105 106 if (i % 2 == 0) { 107 region.flush(true); 108 } 109 } 110 111 // we've created 10 storefiles at this point, 5 per family 112 region.flush(true); 113 114 } 115 116 /** 117 * Simplest test that ensures we don't count block sizes too much. These 2 requested cells are in 118 * the same block, so should be returned in 1 request. If we mis-counted blocks, it'd come in 2 119 * requests. 120 */ 121 @Test 122 public void testSingleBlock() throws IOException { 123 Table table = TEST_UTIL.getConnection().getTable(TABLE); 124 125 ResultScanner scanner = 126 table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1)).withStopRow(Bytes.toBytes(2)) 127 .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM)); 128 129 ScanMetrics metrics = scanner.getScanMetrics(); 130 131 scanner.next(100); 132 133 // we fetch 2 columns from 1 row, so about 2 blocks 134 assertEquals(4120, metrics.countOfBlockBytesScanned.get()); 135 assertEquals(1, metrics.countOfRowsScanned.get()); 136 assertEquals(1, metrics.countOfRPCcalls.get()); 137 } 138 139 /** 140 * Tests that we check size limit after filterRowKey. When filterRowKey, we call nextRow to skip 141 * to next row. This should be efficient in this case, but we still need to check size limits 142 * after each row is processed. So in this test, we accumulate some block IO reading row 1, then 143 * skip row 2 and should return early at that point. The next rpc call starts with row3 blocks 144 * loaded, so can return the whole row in one rpc. If we were not checking size limits, we'd have 145 * been able to load an extra row 3 cell into the first rpc and thus split row 3 across multiple 146 * Results. 147 */ 148 @Test 149 public void testCheckLimitAfterFilterRowKey() throws IOException { 150 151 Table table = TEST_UTIL.getConnection().getTable(TABLE); 152 153 ResultScanner scanner = table.getScanner(getBaseScan().addColumn(FAMILY1, COLUMN1) 154 .addColumn(FAMILY1, COLUMN2).addColumn(FAMILY1, COLUMN3).addFamily(FAMILY2) 155 .setFilter(new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(2))))); 156 157 ScanMetrics metrics = scanner.getScanMetrics(); 158 159 boolean foundRow3 = false; 160 for (Result result : scanner) { 161 Set<Integer> rows = new HashSet<>(); 162 for (Cell cell : result.rawCells()) { 163 rows.add(Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 164 } 165 if (rows.contains(3)) { 166 assertFalse(foundRow3, 167 "expected row3 to come all in one result, but found it in two results"); 168 assertEquals(1, rows.size()); 169 foundRow3 = true; 170 } 171 } 172 173 // 22 blocks, last one is 1030 bytes (approx 3 per row for 8 rows, but some compaction happens 174 // in family2 since each row only has 1 cell there and 2 can fit per block) 175 assertEquals(44290, metrics.countOfBlockBytesScanned.get()); 176 // We can return 22 blocks in 9 RPCs, but need an extra one to check for more rows at end 177 assertEquals(10, metrics.countOfRPCcalls.get()); 178 } 179 180 /** 181 * After RegionScannerImpl.populateResults, row filters are run. If row is excluded due to 182 * filter.filterRow(), nextRow() is called which might accumulate more block IO. Validates that in 183 * this case we still honor block limits. 184 */ 185 @Test 186 public void testCheckLimitAfterFilteringRowCellsDueToFilterRow() throws IOException { 187 Table table = TEST_UTIL.getConnection().getTable(TABLE); 188 189 ResultScanner scanner = table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true) 190 .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM) 191 .setFilter(new SkipFilter(new QualifierFilter(CompareOperator.EQUAL, 192 new BinaryComparator(Bytes.toBytes("dasfasf")))))); 193 194 // Our filter doesn't end up matching any real columns, so expect only cursors 195 for (Result result : scanner) { 196 assertTrue(result.isCursor()); 197 } 198 199 ScanMetrics metrics = scanner.getScanMetrics(); 200 201 // scanning over 9 rows, filtering on 2 contiguous columns each, so 9 blocks total 202 assertEquals(18540, metrics.countOfBlockBytesScanned.get()); 203 // limited to 4200 bytes per which is enough for 3 blocks (exceed limit after loading 3rd) 204 // so that's 3 RPC and the last RPC pulls the cells loaded by the last block 205 assertEquals(4, metrics.countOfRPCcalls.get()); 206 } 207 208 /** 209 * At the end of the loop in StoreScanner, we do one more check of size limits. This is to catch 210 * block size being exceeded while filtering cells within a store. Test to ensure that we do that, 211 * otherwise we'd see no cursors below. 212 */ 213 @Test 214 public void testCheckLimitAfterFilteringCell() throws IOException { 215 Table table = TEST_UTIL.getConnection().getTable(TABLE); 216 217 ResultScanner scanner = table.getScanner(getBaseScan() 218 .setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(COLUMN2)))); 219 220 int cursors = 0; 221 for (Result result : scanner) { 222 if (result.isCursor()) { 223 cursors++; 224 } 225 } 226 ScanMetrics metrics = scanner.getScanMetrics(); 227 System.out.println(metrics.countOfBlockBytesScanned.get()); 228 229 // 9 rows, total of 32 blocks (last one is 1030) 230 assertEquals(64890, metrics.countOfBlockBytesScanned.get()); 231 // We can return 32 blocks in approx 11 RPCs but we need 2 cursors due to the narrow filter 232 assertEquals(2, cursors); 233 assertEquals(11, metrics.countOfRPCcalls.get()); 234 } 235 236 /** 237 * After RegionScannerImpl.populateResults, row filters are run. If row is excluded due to 238 * filter.filterRowCells(), we fall through to a final results.isEmpty() check near the end of the 239 * method. If results are empty at this point (which they are), nextRow() is called which might 240 * accumulate more block IO. Validates that in this case we still honor block limits. 241 */ 242 @Test 243 public void testCheckLimitAfterFilteringRowCells() throws IOException { 244 Table table = TEST_UTIL.getConnection().getTable(TABLE); 245 246 ResultScanner scanner = table 247 .getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true).addColumn(FAMILY1, COLUMN1) 248 .setReadType(Scan.ReadType.STREAM).setFilter(new SingleColumnValueExcludeFilter(FAMILY1, 249 COLUMN1, CompareOperator.EQUAL, new BinaryComparator(DATA)))); 250 251 // Since we use SingleColumnValueExcludeFilter and dont include any other columns, the column 252 // we load to test ends up being excluded from the result. So we only expect cursors here. 253 for (Result result : scanner) { 254 assertTrue(result.isCursor()); 255 } 256 257 ScanMetrics metrics = scanner.getScanMetrics(); 258 259 // Our filter causes us to read the first column of each row, then INCLUDE_AND_SEEK_NEXT_ROW. 260 // So we load 1 block per row, and there are 9 rows. So 9 blocks 261 assertEquals(18540, metrics.countOfBlockBytesScanned.get()); 262 // We can return 9 blocks in 3 RPCs, but need 1 more to check for more results (returns 0) 263 assertEquals(4, metrics.countOfRPCcalls.get()); 264 } 265 266 /** 267 * Tests that when we seek over blocks we dont include them in the block size of the request 268 */ 269 @Test 270 public void testSeekNextUsingHint() throws IOException { 271 Table table = TEST_UTIL.getConnection().getTable(TABLE); 272 273 ResultScanner scanner = table.getScanner( 274 getBaseScan().addFamily(FAMILY1).setFilter(new ColumnPaginationFilter(1, COLUMN5))); 275 276 scanner.next(100); 277 ScanMetrics metrics = scanner.getScanMetrics(); 278 279 // We have to read the first cell/block of each row, then can skip to the last block. So that's 280 // 2 blocks per row to read (18 blocks total) 281 assertEquals(37080, metrics.countOfBlockBytesScanned.get()); 282 // Our max scan size is enough to read 3 blocks per RPC, plus one final RPC to finish region. 283 assertEquals(7, metrics.countOfRPCcalls.get()); 284 } 285 286 /** 287 * We enable cursors and partial results to give us more granularity over counting of results, and 288 * we enable STREAM so that no auto switching from pread to stream occurs -- this throws off the 289 * rpc counts. 290 */ 291 private Scan getBaseScan() { 292 return new Scan().setScanMetricsEnabled(true).setNeedCursorResult(true) 293 .setAllowPartialResults(true).setReadType(Scan.ReadType.STREAM); 294 } 295}