001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.hadoop.hbase.client.Put;
028import org.apache.hadoop.hbase.client.Result;
029import org.apache.hadoop.hbase.client.ResultScanner;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
033import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
034import org.apache.hadoop.hbase.filter.BinaryComparator;
035import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
036import org.apache.hadoop.hbase.filter.Filter;
037import org.apache.hadoop.hbase.filter.FilterList;
038import org.apache.hadoop.hbase.filter.FilterList.Operator;
039import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
040import org.apache.hadoop.hbase.filter.RowFilter;
041import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
042import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
043import org.apache.hadoop.hbase.testclassification.LargeTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.jupiter.api.AfterAll;
046import org.junit.jupiter.api.BeforeAll;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.Test;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052@Tag(LargeTests.TAG)
053public class TestServerSideScanMetricsFromClientSide {
054  private static final Logger LOG =
055    LoggerFactory.getLogger(TestServerSideScanMetricsFromClientSide.class);
056
057  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
058
059  private static Table TABLE = null;
060
061  /**
062   * Table configuration
063   */
064  private static TableName TABLE_NAME = TableName.valueOf("testTable");
065
066  private static int NUM_ROWS = 10;
067  private static byte[] ROW = Bytes.toBytes("testRow");
068  private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
069
070  // Should keep this value below 10 to keep generation of expected kv's simple. If above 10 then
071  // table/row/cf1/... will be followed by table/row/cf10/... instead of table/row/cf2/... which
072  // breaks the simple generation of expected kv's
073  private static int NUM_FAMILIES = 1;
074  private static byte[] FAMILY = Bytes.toBytes("testFamily");
075  private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
076
077  private static int NUM_QUALIFIERS = 1;
078  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
079  private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
080
081  private static int VALUE_SIZE = 10;
082  private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
083
084  private static int NUM_COLS = NUM_FAMILIES * NUM_QUALIFIERS;
085
086  // Approximation of how large the heap size of cells in our table. Should be accessed through
087  // getCellHeapSize().
088  private static long CELL_HEAP_SIZE = -1;
089
090  @BeforeAll
091  public static void setUpBeforeClass() throws Exception {
092    TEST_UTIL.startMiniCluster(3);
093    TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
094  }
095
096  private static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
097    byte[][] qualifiers, byte[] cellValue) throws IOException {
098    Table ht = TEST_UTIL.createTable(name, families);
099    List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
100    ht.put(puts);
101
102    return ht;
103  }
104
105  @AfterAll
106  public static void tearDownAfterClass() throws Exception {
107    TEST_UTIL.shutdownMiniCluster();
108  }
109
110  /**
111   * Make puts to put the input value into each combination of row, family, and qualifier
112   * @param rows       the rows to use
113   * @param families   the column families to use
114   * @param qualifiers the column qualifiers to use
115   * @param value      the value to put
116   * @return the putted input values added in puts
117   * @throws IOException If an IO problem is encountered
118   */
119  private static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
120    byte[] value) throws IOException {
121    Put put;
122    ArrayList<Put> puts = new ArrayList<>();
123
124    for (int row = 0; row < rows.length; row++) {
125      put = new Put(rows[row]);
126      for (int fam = 0; fam < families.length; fam++) {
127        for (int qual = 0; qual < qualifiers.length; qual++) {
128          KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
129          put.add(kv);
130        }
131      }
132      puts.add(put);
133    }
134
135    return puts;
136  }
137
138  /**
139   * @return The approximate heap size of a cell in the test table. All cells should have
140   *         approximately the same heap size, so the value is cached to avoid repeating the
141   *         calculation
142   * @throws Exception on unexpected failure
143   */
144  private long getCellHeapSize() throws Exception {
145    if (CELL_HEAP_SIZE == -1) {
146      // Do a partial scan that will return a single result with a single cell
147      Scan scan = new Scan();
148      scan.setMaxResultSize(1);
149      scan.setAllowPartialResults(true);
150      ResultScanner scanner = TABLE.getScanner(scan);
151
152      Result result = scanner.next();
153
154      assertTrue(result != null);
155      assertTrue(result.rawCells() != null);
156      assertTrue(result.rawCells().length == 1);
157
158      CELL_HEAP_SIZE = result.rawCells()[0].heapSize();
159      scanner.close();
160    }
161
162    return CELL_HEAP_SIZE;
163  }
164
165  @Test
166  public void testRowsSeenMetric() throws Exception {
167    // Base scan configuration
168    Scan baseScan;
169    baseScan = new Scan();
170    baseScan.setScanMetricsEnabled(true);
171    try {
172      testRowsSeenMetric(baseScan);
173
174      // Test case that only a single result will be returned per RPC to the serer
175      baseScan.setCaching(1);
176      testRowsSeenMetric(baseScan);
177
178      // Test case that partial results are returned from the server. At most one cell will be
179      // contained in each response
180      baseScan.setMaxResultSize(1);
181      testRowsSeenMetric(baseScan);
182
183      // Test case that size limit is set such that a few cells are returned per partial result from
184      // the server
185      baseScan.setCaching(NUM_ROWS);
186      baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1));
187      testRowsSeenMetric(baseScan);
188    } catch (Throwable t) {
189      LOG.error("FAIL", t);
190      throw t;
191    }
192  }
193
194  @Test
195  public void testFsReadTimeMetric() throws Exception {
196    // write some new puts and flush, as an easy way to ensure the read blocks are not cached
197    // so that we go into the fs write code path
198    List<Put> puts = createPuts(ROWS, FAMILIES, QUALIFIERS, VALUE);
199    TABLE.put(puts);
200    TEST_UTIL.flush(TABLE_NAME);
201    Scan scan = new Scan();
202    scan.setScanMetricsEnabled(true);
203    testMetric(scan, ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME, 0, CompareOperator.GREATER);
204  }
205
206  private void testRowsSeenMetric(Scan baseScan) throws Exception {
207    Scan scan;
208    scan = new Scan(baseScan);
209    testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, NUM_ROWS);
210
211    for (int i = 0; i < ROWS.length - 1; i++) {
212      scan = new Scan(baseScan);
213      scan.withStartRow(ROWS[0]);
214      scan.withStopRow(ROWS[i + 1]);
215      testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, i + 1);
216    }
217
218    for (int i = ROWS.length - 1; i > 0; i--) {
219      scan = new Scan(baseScan);
220      scan.withStartRow(ROWS[i - 1]);
221      scan.withStopRow(ROWS[ROWS.length - 1]);
222      testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME,
223        ROWS.length - i);
224    }
225
226    // The filter should filter out all rows, but we still expect to see every row.
227    Filter filter =
228      new RowFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("xyz")));
229    scan = new Scan(baseScan);
230    scan.setFilter(filter);
231    testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length);
232
233    // Filter should pass on all rows
234    SingleColumnValueFilter singleColumnValueFilter =
235      new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.EQUAL, VALUE);
236    scan = new Scan(baseScan);
237    scan.setFilter(singleColumnValueFilter);
238    testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length);
239
240    // Filter should filter out all rows
241    singleColumnValueFilter =
242      new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.NOT_EQUAL, VALUE);
243    scan = new Scan(baseScan);
244    scan.setFilter(singleColumnValueFilter);
245    testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length);
246  }
247
248  @Test
249  public void testRowsFilteredMetric() throws Exception {
250    // Base scan configuration
251    Scan baseScan;
252    baseScan = new Scan();
253    baseScan.setScanMetricsEnabled(true);
254
255    // Test case where scan uses default values
256    testRowsFilteredMetric(baseScan);
257
258    // Test case where at most one Result is retrieved per RPC
259    baseScan.setCaching(1);
260    testRowsFilteredMetric(baseScan);
261
262    // Test case where size limit is very restrictive and partial results will be returned from
263    // server
264    baseScan.setMaxResultSize(1);
265    testRowsFilteredMetric(baseScan);
266
267    // Test a case where max result size limits response from server to only a few cells (not all
268    // cells from the row)
269    baseScan.setCaching(NUM_ROWS);
270    baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1));
271    testRowsSeenMetric(baseScan);
272  }
273
274  private void testRowsFilteredMetric(Scan baseScan) throws Exception {
275    testRowsFilteredMetric(baseScan, null, 0);
276
277    // Row filter doesn't match any row key. All rows should be filtered
278    Filter filter =
279      new RowFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("xyz")));
280    testRowsFilteredMetric(baseScan, filter, ROWS.length);
281
282    // Filter will return results containing only the first key. Number of entire rows filtered
283    // should be 0.
284    filter = new FirstKeyOnlyFilter();
285    testRowsFilteredMetric(baseScan, filter, 0);
286
287    // Column prefix will find some matching qualifier on each row. Number of entire rows filtered
288    // should be 0
289    filter = new ColumnPrefixFilter(QUALIFIERS[0]);
290    testRowsFilteredMetric(baseScan, filter, 0);
291
292    // Column prefix will NOT find any matching qualifier on any row. All rows should be filtered
293    filter = new ColumnPrefixFilter(Bytes.toBytes("xyz"));
294    testRowsFilteredMetric(baseScan, filter, ROWS.length);
295
296    // Matching column value should exist in each row. No rows should be filtered.
297    filter = new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.EQUAL, VALUE);
298    testRowsFilteredMetric(baseScan, filter, 0);
299
300    // No matching column value should exist in any row. Filter all rows
301    filter =
302      new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.NOT_EQUAL, VALUE);
303    testRowsFilteredMetric(baseScan, filter, ROWS.length);
304
305    List<Filter> filters = new ArrayList<>();
306    filters.add(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROWS[0])));
307    filters.add(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROWS[3])));
308    int numberOfMatchingRowFilters = filters.size();
309    filter = new FilterList(Operator.MUST_PASS_ONE, filters);
310    testRowsFilteredMetric(baseScan, filter, ROWS.length - numberOfMatchingRowFilters);
311    filters.clear();
312
313    // Add a single column value exclude filter for each column... The net effect is that all
314    // columns will be excluded when scanning on the server side. This will result in an empty cell
315    // array in RegionScanner#nextInternal which should be interpreted as a row being filtered.
316    for (int family = 0; family < FAMILIES.length; family++) {
317      for (int qualifier = 0; qualifier < QUALIFIERS.length; qualifier++) {
318        filters.add(new SingleColumnValueExcludeFilter(FAMILIES[family], QUALIFIERS[qualifier],
319          CompareOperator.EQUAL, VALUE));
320      }
321    }
322    filter = new FilterList(Operator.MUST_PASS_ONE, filters);
323    testRowsFilteredMetric(baseScan, filter, ROWS.length);
324  }
325
326  private void testRowsFilteredMetric(Scan baseScan, Filter filter, int expectedNumFiltered)
327    throws Exception {
328    Scan scan = new Scan(baseScan);
329    if (filter != null) {
330      scan.setFilter(filter);
331    }
332    testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME,
333      expectedNumFiltered);
334  }
335
336  /**
337   * Run the scan to completetion and check the metric against the specified value
338   * @param scan          The scan instance to use to record metrics
339   * @param metricKey     The metric key name
340   * @param expectedValue The expected value of metric
341   * @throws Exception on unexpected failure
342   */
343  private void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception {
344    testMetric(scan, metricKey, expectedValue, CompareOperator.EQUAL);
345  }
346
347  private void testMetric(Scan scan, String metricKey, long expectedValue,
348    CompareOperator compareOperator) throws Exception {
349    assertTrue(scan.isScanMetricsEnabled(), "Scan should be configured to record metrics");
350    ResultScanner scanner = TABLE.getScanner(scan);
351    // Iterate through all the results
352    while (scanner.next() != null) {
353      continue;
354    }
355    scanner.close();
356    ScanMetrics metrics = scanner.getScanMetrics();
357    assertNotNull(metrics, "Metrics are null");
358    assertTrue(metrics.hasCounter(metricKey), "Metric : " + metricKey + " does not exist");
359    final long actualMetricValue = metrics.getCounter(metricKey).get();
360    if (compareOperator == CompareOperator.EQUAL) {
361      assertEquals(expectedValue, actualMetricValue,
362        "Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue);
363    } else {
364      assertTrue(actualMetricValue > expectedValue,
365        "Metric: " + metricKey + " Expected: > " + expectedValue + " Actual: " + actualMetricValue);
366    }
367  }
368}