001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import org.apache.hadoop.hbase.client.Put; 027import org.apache.hadoop.hbase.client.Result; 028import org.apache.hadoop.hbase.client.ResultScanner; 029import org.apache.hadoop.hbase.client.Scan; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 032import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; 033import org.apache.hadoop.hbase.filter.BinaryComparator; 034import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; 035import org.apache.hadoop.hbase.filter.Filter; 036import org.apache.hadoop.hbase.filter.FilterList; 037import org.apache.hadoop.hbase.filter.FilterList.Operator; 038import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 039import org.apache.hadoop.hbase.filter.RowFilter; 040import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; 041import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.junit.AfterClass; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052@Category(LargeTests.class) 053public class TestServerSideScanMetricsFromClientSide { 054 private static final Logger LOG = 055 LoggerFactory.getLogger(TestServerSideScanMetricsFromClientSide.class); 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestServerSideScanMetricsFromClientSide.class); 060 061 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 062 063 private static Table TABLE = null; 064 065 /** 066 * Table configuration 067 */ 068 private static TableName TABLE_NAME = TableName.valueOf("testTable"); 069 070 private static int NUM_ROWS = 10; 071 private static byte[] ROW = Bytes.toBytes("testRow"); 072 private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); 073 074 // Should keep this value below 10 to keep generation of expected kv's simple. If above 10 then 075 // table/row/cf1/... will be followed by table/row/cf10/... instead of table/row/cf2/... which 076 // breaks the simple generation of expected kv's 077 private static int NUM_FAMILIES = 1; 078 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 079 private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); 080 081 private static int NUM_QUALIFIERS = 1; 082 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); 083 private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); 084 085 private static int VALUE_SIZE = 10; 086 private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); 087 088 private static int NUM_COLS = NUM_FAMILIES * NUM_QUALIFIERS; 089 090 // Approximation of how large the heap size of cells in our table. Should be accessed through 091 // getCellHeapSize(). 092 private static long CELL_HEAP_SIZE = -1; 093 094 @BeforeClass 095 public static void setUpBeforeClass() throws Exception { 096 TEST_UTIL.startMiniCluster(3); 097 TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); 098 } 099 100 private static Table createTestTable(TableName name, byte[][] rows, byte[][] families, 101 byte[][] qualifiers, byte[] cellValue) throws IOException { 102 Table ht = TEST_UTIL.createTable(name, families); 103 List<Put> puts = createPuts(rows, families, qualifiers, cellValue); 104 ht.put(puts); 105 106 return ht; 107 } 108 109 @AfterClass 110 public static void tearDownAfterClass() throws Exception { 111 TEST_UTIL.shutdownMiniCluster(); 112 } 113 114 /** 115 * Make puts to put the input value into each combination of row, family, and qualifier 116 * @param rows the rows to use 117 * @param families the column families to use 118 * @param qualifiers the column qualifiers to use 119 * @param value the value to put 120 * @return the putted input values added in puts 121 * @throws IOException If an IO problem is encountered 122 */ 123 private static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, 124 byte[] value) throws IOException { 125 Put put; 126 ArrayList<Put> puts = new ArrayList<>(); 127 128 for (int row = 0; row < rows.length; row++) { 129 put = new Put(rows[row]); 130 for (int fam = 0; fam < families.length; fam++) { 131 for (int qual = 0; qual < qualifiers.length; qual++) { 132 KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); 133 put.add(kv); 134 } 135 } 136 puts.add(put); 137 } 138 139 return puts; 140 } 141 142 /** 143 * @return The approximate heap size of a cell in the test table. All cells should have 144 * approximately the same heap size, so the value is cached to avoid repeating the 145 * calculation 146 * @throws Exception on unexpected failure 147 */ 148 private long getCellHeapSize() throws Exception { 149 if (CELL_HEAP_SIZE == -1) { 150 // Do a partial scan that will return a single result with a single cell 151 Scan scan = new Scan(); 152 scan.setMaxResultSize(1); 153 scan.setAllowPartialResults(true); 154 ResultScanner scanner = TABLE.getScanner(scan); 155 156 Result result = scanner.next(); 157 158 assertTrue(result != null); 159 assertTrue(result.rawCells() != null); 160 assertTrue(result.rawCells().length == 1); 161 162 CELL_HEAP_SIZE = result.rawCells()[0].heapSize(); 163 scanner.close(); 164 } 165 166 return CELL_HEAP_SIZE; 167 } 168 169 @Test 170 public void testRowsSeenMetric() throws Exception { 171 // Base scan configuration 172 Scan baseScan; 173 baseScan = new Scan(); 174 baseScan.setScanMetricsEnabled(true); 175 try { 176 testRowsSeenMetric(baseScan); 177 178 // Test case that only a single result will be returned per RPC to the serer 179 baseScan.setCaching(1); 180 testRowsSeenMetric(baseScan); 181 182 // Test case that partial results are returned from the server. At most one cell will be 183 // contained in each response 184 baseScan.setMaxResultSize(1); 185 testRowsSeenMetric(baseScan); 186 187 // Test case that size limit is set such that a few cells are returned per partial result from 188 // the server 189 baseScan.setCaching(NUM_ROWS); 190 baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1)); 191 testRowsSeenMetric(baseScan); 192 } catch (Throwable t) { 193 LOG.error("FAIL", t); 194 throw t; 195 } 196 } 197 198 private void testRowsSeenMetric(Scan baseScan) throws Exception { 199 Scan scan; 200 scan = new Scan(baseScan); 201 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, NUM_ROWS); 202 203 for (int i = 0; i < ROWS.length - 1; i++) { 204 scan = new Scan(baseScan); 205 scan.withStartRow(ROWS[0]); 206 scan.withStopRow(ROWS[i + 1]); 207 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, i + 1); 208 } 209 210 for (int i = ROWS.length - 1; i > 0; i--) { 211 scan = new Scan(baseScan); 212 scan.withStartRow(ROWS[i - 1]); 213 scan.withStopRow(ROWS[ROWS.length - 1]); 214 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 215 ROWS.length - i); 216 } 217 218 // The filter should filter out all rows, but we still expect to see every row. 219 Filter filter = 220 new RowFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("xyz"))); 221 scan = new Scan(baseScan); 222 scan.setFilter(filter); 223 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length); 224 225 // Filter should pass on all rows 226 SingleColumnValueFilter singleColumnValueFilter = 227 new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.EQUAL, VALUE); 228 scan = new Scan(baseScan); 229 scan.setFilter(singleColumnValueFilter); 230 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length); 231 232 // Filter should filter out all rows 233 singleColumnValueFilter = 234 new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.NOT_EQUAL, VALUE); 235 scan = new Scan(baseScan); 236 scan.setFilter(singleColumnValueFilter); 237 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length); 238 } 239 240 @Test 241 public void testRowsFilteredMetric() throws Exception { 242 // Base scan configuration 243 Scan baseScan; 244 baseScan = new Scan(); 245 baseScan.setScanMetricsEnabled(true); 246 247 // Test case where scan uses default values 248 testRowsFilteredMetric(baseScan); 249 250 // Test case where at most one Result is retrieved per RPC 251 baseScan.setCaching(1); 252 testRowsFilteredMetric(baseScan); 253 254 // Test case where size limit is very restrictive and partial results will be returned from 255 // server 256 baseScan.setMaxResultSize(1); 257 testRowsFilteredMetric(baseScan); 258 259 // Test a case where max result size limits response from server to only a few cells (not all 260 // cells from the row) 261 baseScan.setCaching(NUM_ROWS); 262 baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1)); 263 testRowsSeenMetric(baseScan); 264 } 265 266 private void testRowsFilteredMetric(Scan baseScan) throws Exception { 267 testRowsFilteredMetric(baseScan, null, 0); 268 269 // Row filter doesn't match any row key. All rows should be filtered 270 Filter filter = 271 new RowFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("xyz"))); 272 testRowsFilteredMetric(baseScan, filter, ROWS.length); 273 274 // Filter will return results containing only the first key. Number of entire rows filtered 275 // should be 0. 276 filter = new FirstKeyOnlyFilter(); 277 testRowsFilteredMetric(baseScan, filter, 0); 278 279 // Column prefix will find some matching qualifier on each row. Number of entire rows filtered 280 // should be 0 281 filter = new ColumnPrefixFilter(QUALIFIERS[0]); 282 testRowsFilteredMetric(baseScan, filter, 0); 283 284 // Column prefix will NOT find any matching qualifier on any row. All rows should be filtered 285 filter = new ColumnPrefixFilter(Bytes.toBytes("xyz")); 286 testRowsFilteredMetric(baseScan, filter, ROWS.length); 287 288 // Matching column value should exist in each row. No rows should be filtered. 289 filter = new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.EQUAL, VALUE); 290 testRowsFilteredMetric(baseScan, filter, 0); 291 292 // No matching column value should exist in any row. Filter all rows 293 filter = 294 new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.NOT_EQUAL, VALUE); 295 testRowsFilteredMetric(baseScan, filter, ROWS.length); 296 297 List<Filter> filters = new ArrayList<>(); 298 filters.add(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROWS[0]))); 299 filters.add(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROWS[3]))); 300 int numberOfMatchingRowFilters = filters.size(); 301 filter = new FilterList(Operator.MUST_PASS_ONE, filters); 302 testRowsFilteredMetric(baseScan, filter, ROWS.length - numberOfMatchingRowFilters); 303 filters.clear(); 304 305 // Add a single column value exclude filter for each column... The net effect is that all 306 // columns will be excluded when scanning on the server side. This will result in an empty cell 307 // array in RegionScanner#nextInternal which should be interpreted as a row being filtered. 308 for (int family = 0; family < FAMILIES.length; family++) { 309 for (int qualifier = 0; qualifier < QUALIFIERS.length; qualifier++) { 310 filters.add(new SingleColumnValueExcludeFilter(FAMILIES[family], QUALIFIERS[qualifier], 311 CompareOperator.EQUAL, VALUE)); 312 } 313 } 314 filter = new FilterList(Operator.MUST_PASS_ONE, filters); 315 testRowsFilteredMetric(baseScan, filter, ROWS.length); 316 } 317 318 private void testRowsFilteredMetric(Scan baseScan, Filter filter, int expectedNumFiltered) 319 throws Exception { 320 Scan scan = new Scan(baseScan); 321 if (filter != null) { 322 scan.setFilter(filter); 323 } 324 testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 325 expectedNumFiltered); 326 } 327 328 /** 329 * Run the scan to completetion and check the metric against the specified value 330 * @param scan The scan instance to use to record metrics 331 * @param metricKey The metric key name 332 * @param expectedValue The expected value of metric 333 * @throws Exception on unexpected failure 334 */ 335 private void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { 336 assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); 337 ResultScanner scanner = TABLE.getScanner(scan); 338 // Iterate through all the results 339 while (scanner.next() != null) { 340 continue; 341 } 342 scanner.close(); 343 ScanMetrics metrics = scanner.getScanMetrics(); 344 assertTrue("Metrics are null", metrics != null); 345 assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); 346 final long actualMetricValue = metrics.getCounter(metricKey).get(); 347 assertEquals( 348 "Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue, 349 expectedValue, actualMetricValue); 350 } 351}