001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.HashSet;
026import java.util.Set;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CompareOperator;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.RegionLocator;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.ResultScanner;
037import org.apache.hadoop.hbase.client.Scan;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
040import org.apache.hadoop.hbase.filter.BinaryComparator;
041import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
042import org.apache.hadoop.hbase.filter.QualifierFilter;
043import org.apache.hadoop.hbase.filter.RowFilter;
044import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
045import org.apache.hadoop.hbase.filter.SkipFilter;
046import org.apache.hadoop.hbase.testclassification.LargeTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.junit.jupiter.api.BeforeAll;
049import org.junit.jupiter.api.BeforeEach;
050import org.junit.jupiter.api.Tag;
051import org.junit.jupiter.api.Test;
052
053@Tag(LargeTests.TAG)
054public class TestScannerBlockSizeLimits {
055
056  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
057  private static final TableName TABLE = TableName.valueOf("TestScannerBlockSizeLimits");
058  private static final byte[] FAMILY1 = Bytes.toBytes("0");
059  private static final byte[] FAMILY2 = Bytes.toBytes("1");
060
061  private static final byte[] DATA = new byte[1000];
062  private static final byte[][] FAMILIES = new byte[][] { FAMILY1, FAMILY2 };
063
064  private static final byte[] COLUMN1 = Bytes.toBytes(0);
065  private static final byte[] COLUMN2 = Bytes.toBytes(1);
066  private static final byte[] COLUMN3 = Bytes.toBytes(2);
067  private static final byte[] COLUMN5 = Bytes.toBytes(5);
068
069  @BeforeAll
070  public static void setUp() throws Exception {
071    Configuration conf = TEST_UTIL.getConfiguration();
072    conf.setInt(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4200);
073    TEST_UTIL.startMiniCluster(1);
074    TEST_UTIL.createTable(TABLE, FAMILIES, 1, 2048);
075    createTestData();
076  }
077
078  @BeforeEach
079  public void setupEach() throws Exception {
080    HRegionServer regionServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
081    for (HRegion region : regionServer.getRegions(TABLE)) {
082      System.out.println("Clearing cache for region " + region.getRegionInfo().getEncodedName());
083      regionServer.clearRegionBlockCache(region);
084    }
085  }
086
087  private static void createTestData() throws IOException, InterruptedException {
088    RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
089    String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
090    HRegion region = TEST_UTIL.getRSForFirstRegionInTable(TABLE).getRegion(regionName);
091
092    for (int i = 1; i < 10; i++) {
093      // 5 columns per row, in 2 families
094      // Each column value is 1000 bytes, which is enough to fill a full block with row and header.
095      // So 5 blocks per row in FAMILY1
096      Put put = new Put(Bytes.toBytes(i));
097      for (int j = 0; j < 6; j++) {
098        put.addColumn(FAMILY1, Bytes.toBytes(j), DATA);
099      }
100
101      // Additional block in FAMILY2 (notably smaller than block size)
102      put.addColumn(FAMILY2, COLUMN1, DATA);
103
104      region.put(put);
105
106      if (i % 2 == 0) {
107        region.flush(true);
108      }
109    }
110
111    // we've created 10 storefiles at this point, 5 per family
112    region.flush(true);
113
114  }
115
116  /**
117   * Simplest test that ensures we don't count block sizes too much. These 2 requested cells are in
118   * the same block, so should be returned in 1 request. If we mis-counted blocks, it'd come in 2
119   * requests.
120   */
121  @Test
122  public void testSingleBlock() throws IOException {
123    Table table = TEST_UTIL.getConnection().getTable(TABLE);
124
125    ResultScanner scanner =
126      table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1)).withStopRow(Bytes.toBytes(2))
127        .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM));
128
129    ScanMetrics metrics = scanner.getScanMetrics();
130
131    scanner.next(100);
132
133    // we fetch 2 columns from 1 row, so about 2 blocks
134    assertEquals(4120, metrics.countOfBlockBytesScanned.get());
135    assertEquals(1, metrics.countOfRowsScanned.get());
136    assertEquals(1, metrics.countOfRPCcalls.get());
137  }
138
139  /**
140   * Tests that we check size limit after filterRowKey. When filterRowKey, we call nextRow to skip
141   * to next row. This should be efficient in this case, but we still need to check size limits
142   * after each row is processed. So in this test, we accumulate some block IO reading row 1, then
143   * skip row 2 and should return early at that point. The next rpc call starts with row3 blocks
144   * loaded, so can return the whole row in one rpc. If we were not checking size limits, we'd have
145   * been able to load an extra row 3 cell into the first rpc and thus split row 3 across multiple
146   * Results.
147   */
148  @Test
149  public void testCheckLimitAfterFilterRowKey() throws IOException {
150
151    Table table = TEST_UTIL.getConnection().getTable(TABLE);
152
153    ResultScanner scanner = table.getScanner(getBaseScan().addColumn(FAMILY1, COLUMN1)
154      .addColumn(FAMILY1, COLUMN2).addColumn(FAMILY1, COLUMN3).addFamily(FAMILY2)
155      .setFilter(new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(2)))));
156
157    ScanMetrics metrics = scanner.getScanMetrics();
158
159    boolean foundRow3 = false;
160    for (Result result : scanner) {
161      Set<Integer> rows = new HashSet<>();
162      for (Cell cell : result.rawCells()) {
163        rows.add(Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
164      }
165      if (rows.contains(3)) {
166        assertFalse(foundRow3,
167          "expected row3 to come all in one result, but found it in two results");
168        assertEquals(1, rows.size());
169        foundRow3 = true;
170      }
171    }
172
173    // 22 blocks, last one is 1030 bytes (approx 3 per row for 8 rows, but some compaction happens
174    // in family2 since each row only has 1 cell there and 2 can fit per block)
175    assertEquals(44290, metrics.countOfBlockBytesScanned.get());
176    // We can return 22 blocks in 9 RPCs, but need an extra one to check for more rows at end
177    assertEquals(10, metrics.countOfRPCcalls.get());
178  }
179
180  /**
181   * After RegionScannerImpl.populateResults, row filters are run. If row is excluded due to
182   * filter.filterRow(), nextRow() is called which might accumulate more block IO. Validates that in
183   * this case we still honor block limits.
184   */
185  @Test
186  public void testCheckLimitAfterFilteringRowCellsDueToFilterRow() throws IOException {
187    Table table = TEST_UTIL.getConnection().getTable(TABLE);
188
189    ResultScanner scanner = table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true)
190      .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM)
191      .setFilter(new SkipFilter(new QualifierFilter(CompareOperator.EQUAL,
192        new BinaryComparator(Bytes.toBytes("dasfasf"))))));
193
194    // Our filter doesn't end up matching any real columns, so expect only cursors
195    for (Result result : scanner) {
196      assertTrue(result.isCursor());
197    }
198
199    ScanMetrics metrics = scanner.getScanMetrics();
200
201    // scanning over 9 rows, filtering on 2 contiguous columns each, so 9 blocks total
202    assertEquals(18540, metrics.countOfBlockBytesScanned.get());
203    // limited to 4200 bytes per which is enough for 3 blocks (exceed limit after loading 3rd)
204    // so that's 3 RPC and the last RPC pulls the cells loaded by the last block
205    assertEquals(4, metrics.countOfRPCcalls.get());
206  }
207
208  /**
209   * At the end of the loop in StoreScanner, we do one more check of size limits. This is to catch
210   * block size being exceeded while filtering cells within a store. Test to ensure that we do that,
211   * otherwise we'd see no cursors below.
212   */
213  @Test
214  public void testCheckLimitAfterFilteringCell() throws IOException {
215    Table table = TEST_UTIL.getConnection().getTable(TABLE);
216
217    ResultScanner scanner = table.getScanner(getBaseScan()
218      .setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(COLUMN2))));
219
220    int cursors = 0;
221    for (Result result : scanner) {
222      if (result.isCursor()) {
223        cursors++;
224      }
225    }
226    ScanMetrics metrics = scanner.getScanMetrics();
227    System.out.println(metrics.countOfBlockBytesScanned.get());
228
229    // 9 rows, total of 32 blocks (last one is 1030)
230    assertEquals(64890, metrics.countOfBlockBytesScanned.get());
231    // We can return 32 blocks in approx 11 RPCs but we need 2 cursors due to the narrow filter
232    assertEquals(2, cursors);
233    assertEquals(11, metrics.countOfRPCcalls.get());
234  }
235
236  /**
237   * After RegionScannerImpl.populateResults, row filters are run. If row is excluded due to
238   * filter.filterRowCells(), we fall through to a final results.isEmpty() check near the end of the
239   * method. If results are empty at this point (which they are), nextRow() is called which might
240   * accumulate more block IO. Validates that in this case we still honor block limits.
241   */
242  @Test
243  public void testCheckLimitAfterFilteringRowCells() throws IOException {
244    Table table = TEST_UTIL.getConnection().getTable(TABLE);
245
246    ResultScanner scanner = table
247      .getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true).addColumn(FAMILY1, COLUMN1)
248        .setReadType(Scan.ReadType.STREAM).setFilter(new SingleColumnValueExcludeFilter(FAMILY1,
249          COLUMN1, CompareOperator.EQUAL, new BinaryComparator(DATA))));
250
251    // Since we use SingleColumnValueExcludeFilter and dont include any other columns, the column
252    // we load to test ends up being excluded from the result. So we only expect cursors here.
253    for (Result result : scanner) {
254      assertTrue(result.isCursor());
255    }
256
257    ScanMetrics metrics = scanner.getScanMetrics();
258
259    // Our filter causes us to read the first column of each row, then INCLUDE_AND_SEEK_NEXT_ROW.
260    // So we load 1 block per row, and there are 9 rows. So 9 blocks
261    assertEquals(18540, metrics.countOfBlockBytesScanned.get());
262    // We can return 9 blocks in 3 RPCs, but need 1 more to check for more results (returns 0)
263    assertEquals(4, metrics.countOfRPCcalls.get());
264  }
265
266  /**
267   * Tests that when we seek over blocks we dont include them in the block size of the request
268   */
269  @Test
270  public void testSeekNextUsingHint() throws IOException {
271    Table table = TEST_UTIL.getConnection().getTable(TABLE);
272
273    ResultScanner scanner = table.getScanner(
274      getBaseScan().addFamily(FAMILY1).setFilter(new ColumnPaginationFilter(1, COLUMN5)));
275
276    scanner.next(100);
277    ScanMetrics metrics = scanner.getScanMetrics();
278
279    // We have to read the first cell/block of each row, then can skip to the last block. So that's
280    // 2 blocks per row to read (18 blocks total)
281    assertEquals(37080, metrics.countOfBlockBytesScanned.get());
282    // Our max scan size is enough to read 3 blocks per RPC, plus one final RPC to finish region.
283    assertEquals(7, metrics.countOfRPCcalls.get());
284  }
285
286  /**
287   * We enable cursors and partial results to give us more granularity over counting of results, and
288   * we enable STREAM so that no auto switching from pread to stream occurs -- this throws off the
289   * rpc counts.
290   */
291  private Scan getBaseScan() {
292    return new Scan().setScanMetricsEnabled(true).setNeedCursorResult(true)
293      .setAllowPartialResults(true).setReadType(Scan.ReadType.STREAM);
294  }
295}