001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.HashSet;
026import java.util.Set;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CompareOperator;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.RegionLocator;
036import org.apache.hadoop.hbase.client.Result;
037import org.apache.hadoop.hbase.client.ResultScanner;
038import org.apache.hadoop.hbase.client.Scan;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
041import org.apache.hadoop.hbase.filter.BinaryComparator;
042import org.apache.hadoop.hbase.filter.ColumnPaginationFilter;
043import org.apache.hadoop.hbase.filter.QualifierFilter;
044import org.apache.hadoop.hbase.filter.RowFilter;
045import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
046import org.apache.hadoop.hbase.filter.SkipFilter;
047import org.apache.hadoop.hbase.testclassification.LargeTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.Before;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054
055@Category({ LargeTests.class })
056public class TestScannerBlockSizeLimits {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestScannerBlockSizeLimits.class);
061
062  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
063  private static final TableName TABLE = TableName.valueOf("TestScannerBlockSizeLimits");
064  private static final byte[] FAMILY1 = Bytes.toBytes("0");
065  private static final byte[] FAMILY2 = Bytes.toBytes("1");
066
067  private static final byte[] DATA = new byte[1000];
068  private static final byte[][] FAMILIES = new byte[][] { FAMILY1, FAMILY2 };
069
070  private static final byte[] COLUMN1 = Bytes.toBytes(0);
071  private static final byte[] COLUMN2 = Bytes.toBytes(1);
072  private static final byte[] COLUMN3 = Bytes.toBytes(2);
073  private static final byte[] COLUMN5 = Bytes.toBytes(5);
074
075  @BeforeClass
076  public static void setUp() throws Exception {
077    Configuration conf = TEST_UTIL.getConfiguration();
078    conf.setInt(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4200);
079    TEST_UTIL.startMiniCluster(1);
080    TEST_UTIL.createTable(TABLE, FAMILIES, 1, 2048);
081    createTestData();
082  }
083
084  @Before
085  public void setupEach() throws Exception {
086    HRegionServer regionServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
087    for (HRegion region : regionServer.getRegions(TABLE)) {
088      System.out.println("Clearing cache for region " + region.getRegionInfo().getEncodedName());
089      regionServer.clearRegionBlockCache(region);
090    }
091  }
092
093  private static void createTestData() throws IOException, InterruptedException {
094    RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
095    String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
096    HRegion region = TEST_UTIL.getRSForFirstRegionInTable(TABLE).getRegion(regionName);
097
098    for (int i = 1; i < 10; i++) {
099      // 5 columns per row, in 2 families
100      // Each column value is 1000 bytes, which is enough to fill a full block with row and header.
101      // So 5 blocks per row in FAMILY1
102      Put put = new Put(Bytes.toBytes(i));
103      for (int j = 0; j < 6; j++) {
104        put.addColumn(FAMILY1, Bytes.toBytes(j), DATA);
105      }
106
107      // Additional block in FAMILY2 (notably smaller than block size)
108      put.addColumn(FAMILY2, COLUMN1, DATA);
109
110      region.put(put);
111
112      if (i % 2 == 0) {
113        region.flush(true);
114      }
115    }
116
117    // we've created 10 storefiles at this point, 5 per family
118    region.flush(true);
119
120  }
121
122  /**
123   * Simplest test that ensures we don't count block sizes too much. These 2 requested cells are in
124   * the same block, so should be returned in 1 request. If we mis-counted blocks, it'd come in 2
125   * requests.
126   */
127  @Test
128  public void testSingleBlock() throws IOException {
129    Table table = TEST_UTIL.getConnection().getTable(TABLE);
130
131    ResultScanner scanner =
132      table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1)).withStopRow(Bytes.toBytes(2))
133        .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM));
134
135    ScanMetrics metrics = scanner.getScanMetrics();
136
137    scanner.next(100);
138
139    // we fetch 2 columns from 1 row, so about 2 blocks
140    assertEquals(4120, metrics.countOfBlockBytesScanned.get());
141    assertEquals(1, metrics.countOfRowsScanned.get());
142    assertEquals(1, metrics.countOfRPCcalls.get());
143  }
144
145  /**
146   * Tests that we check size limit after filterRowKey. When filterRowKey, we call nextRow to skip
147   * to next row. This should be efficient in this case, but we still need to check size limits
148   * after each row is processed. So in this test, we accumulate some block IO reading row 1, then
149   * skip row 2 and should return early at that point. The next rpc call starts with row3 blocks
150   * loaded, so can return the whole row in one rpc. If we were not checking size limits, we'd have
151   * been able to load an extra row 3 cell into the first rpc and thus split row 3 across multiple
152   * Results.
153   */
154  @Test
155  public void testCheckLimitAfterFilterRowKey() throws IOException {
156
157    Table table = TEST_UTIL.getConnection().getTable(TABLE);
158
159    ResultScanner scanner = table.getScanner(getBaseScan().addColumn(FAMILY1, COLUMN1)
160      .addColumn(FAMILY1, COLUMN2).addColumn(FAMILY1, COLUMN3).addFamily(FAMILY2)
161      .setFilter(new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(2)))));
162
163    ScanMetrics metrics = scanner.getScanMetrics();
164
165    boolean foundRow3 = false;
166    for (Result result : scanner) {
167      Set<Integer> rows = new HashSet<>();
168      for (Cell cell : result.rawCells()) {
169        rows.add(Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
170      }
171      if (rows.contains(3)) {
172        assertFalse("expected row3 to come all in one result, but found it in two results",
173          foundRow3);
174        assertEquals(1, rows.size());
175        foundRow3 = true;
176      }
177    }
178
179    // 22 blocks, last one is 1030 bytes (approx 3 per row for 8 rows, but some compaction happens
180    // in family2 since each row only has 1 cell there and 2 can fit per block)
181    assertEquals(44290, metrics.countOfBlockBytesScanned.get());
182    // We can return 22 blocks in 9 RPCs, but need an extra one to check for more rows at end
183    assertEquals(10, metrics.countOfRPCcalls.get());
184  }
185
186  /**
187   * After RegionScannerImpl.populateResults, row filters are run. If row is excluded due to
188   * filter.filterRow(), nextRow() is called which might accumulate more block IO. Validates that in
189   * this case we still honor block limits.
190   */
191  @Test
192  public void testCheckLimitAfterFilteringRowCellsDueToFilterRow() throws IOException {
193    Table table = TEST_UTIL.getConnection().getTable(TABLE);
194
195    ResultScanner scanner = table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true)
196      .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM)
197      .setFilter(new SkipFilter(new QualifierFilter(CompareOperator.EQUAL,
198        new BinaryComparator(Bytes.toBytes("dasfasf"))))));
199
200    // Our filter doesn't end up matching any real columns, so expect only cursors
201    for (Result result : scanner) {
202      assertTrue(result.isCursor());
203    }
204
205    ScanMetrics metrics = scanner.getScanMetrics();
206
207    // scanning over 9 rows, filtering on 2 contiguous columns each, so 9 blocks total
208    assertEquals(18540, metrics.countOfBlockBytesScanned.get());
209    // limited to 4200 bytes per which is enough for 3 blocks (exceed limit after loading 3rd)
210    // so that's 3 RPC and the last RPC pulls the cells loaded by the last block
211    assertEquals(4, metrics.countOfRPCcalls.get());
212  }
213
214  /**
215   * At the end of the loop in StoreScanner, we do one more check of size limits. This is to catch
216   * block size being exceeded while filtering cells within a store. Test to ensure that we do that,
217   * otherwise we'd see no cursors below.
218   */
219  @Test
220  public void testCheckLimitAfterFilteringCell() throws IOException {
221    Table table = TEST_UTIL.getConnection().getTable(TABLE);
222
223    ResultScanner scanner = table.getScanner(getBaseScan()
224      .setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(COLUMN2))));
225
226    int cursors = 0;
227    for (Result result : scanner) {
228      if (result.isCursor()) {
229        cursors++;
230      }
231    }
232    ScanMetrics metrics = scanner.getScanMetrics();
233    System.out.println(metrics.countOfBlockBytesScanned.get());
234
235    // 9 rows, total of 32 blocks (last one is 1030)
236    assertEquals(64890, metrics.countOfBlockBytesScanned.get());
237    // We can return 32 blocks in approx 11 RPCs but we need 2 cursors due to the narrow filter
238    assertEquals(2, cursors);
239    assertEquals(11, metrics.countOfRPCcalls.get());
240  }
241
242  /**
243   * After RegionScannerImpl.populateResults, row filters are run. If row is excluded due to
244   * filter.filterRowCells(), we fall through to a final results.isEmpty() check near the end of the
245   * method. If results are empty at this point (which they are), nextRow() is called which might
246   * accumulate more block IO. Validates that in this case we still honor block limits.
247   */
248  @Test
249  public void testCheckLimitAfterFilteringRowCells() throws IOException {
250    Table table = TEST_UTIL.getConnection().getTable(TABLE);
251
252    ResultScanner scanner = table
253      .getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true).addColumn(FAMILY1, COLUMN1)
254        .setReadType(Scan.ReadType.STREAM).setFilter(new SingleColumnValueExcludeFilter(FAMILY1,
255          COLUMN1, CompareOperator.EQUAL, new BinaryComparator(DATA))));
256
257    // Since we use SingleColumnValueExcludeFilter and dont include any other columns, the column
258    // we load to test ends up being excluded from the result. So we only expect cursors here.
259    for (Result result : scanner) {
260      assertTrue(result.isCursor());
261    }
262
263    ScanMetrics metrics = scanner.getScanMetrics();
264
265    // Our filter causes us to read the first column of each row, then INCLUDE_AND_SEEK_NEXT_ROW.
266    // So we load 1 block per row, and there are 9 rows. So 9 blocks
267    assertEquals(18540, metrics.countOfBlockBytesScanned.get());
268    // We can return 9 blocks in 3 RPCs, but need 1 more to check for more results (returns 0)
269    assertEquals(4, metrics.countOfRPCcalls.get());
270  }
271
272  /**
273   * Tests that when we seek over blocks we dont include them in the block size of the request
274   */
275  @Test
276  public void testSeekNextUsingHint() throws IOException {
277    Table table = TEST_UTIL.getConnection().getTable(TABLE);
278
279    ResultScanner scanner = table.getScanner(
280      getBaseScan().addFamily(FAMILY1).setFilter(new ColumnPaginationFilter(1, COLUMN5)));
281
282    scanner.next(100);
283    ScanMetrics metrics = scanner.getScanMetrics();
284
285    // We have to read the first cell/block of each row, then can skip to the last block. So that's
286    // 2 blocks per row to read (18 blocks total)
287    assertEquals(37080, metrics.countOfBlockBytesScanned.get());
288    // Our max scan size is enough to read 3 blocks per RPC, plus one final RPC to finish region.
289    assertEquals(7, metrics.countOfRPCcalls.get());
290  }
291
292  /**
293   * We enable cursors and partial results to give us more granularity over counting of results, and
294   * we enable STREAM so that no auto switching from pread to stream occurs -- this throws off the
295   * rpc counts.
296   */
297  private Scan getBaseScan() {
298    return new Scan().setScanMetricsEnabled(true).setNeedCursorResult(true)
299      .setAllowPartialResults(true).setReadType(Scan.ReadType.STREAM);
300  }
301}