001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME; 021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; 022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME; 023import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; 024import static org.hamcrest.MatcherAssert.assertThat; 025import static org.hamcrest.Matchers.both; 026import static org.hamcrest.Matchers.greaterThan; 027import static org.hamcrest.Matchers.lessThan; 028import static org.junit.jupiter.api.Assertions.assertEquals; 029import static org.junit.jupiter.api.Assertions.assertNotNull; 030import static org.junit.jupiter.api.Assertions.assertNull; 031import static org.junit.jupiter.api.Assertions.assertTrue; 032 033import java.io.IOException; 034import java.util.ArrayList; 035import java.util.Arrays; 036import java.util.Collections; 037import java.util.List; 038import java.util.Map; 039import java.util.concurrent.ForkJoinPool; 040import java.util.concurrent.TimeUnit; 041import java.util.stream.Stream; 042import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.PrivateCellUtil; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 047import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; 048import org.apache.hadoop.hbase.testclassification.ClientTests; 049import org.apache.hadoop.hbase.testclassification.MediumTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.Pair; 052import org.junit.jupiter.api.AfterAll; 053import org.junit.jupiter.api.BeforeAll; 054import org.junit.jupiter.api.Tag; 055import org.junit.jupiter.api.TestTemplate; 056import org.junit.jupiter.params.provider.Arguments; 057 058import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 059 060@Tag(MediumTests.TAG) 061@Tag(ClientTests.TAG) 062@HBaseParameterizedTestTemplate(name = "{index}: scan={0}") 063public class TestAsyncTableScanMetrics { 064 065 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 066 067 private static final TableName TABLE_NAME = TableName.valueOf("ScanMetrics"); 068 069 private static final byte[] CF = Bytes.toBytes("cf"); 070 071 private static final byte[] CQ = Bytes.toBytes("cq"); 072 073 private static final byte[] VALUE = Bytes.toBytes("value"); 074 075 private static AsyncConnection CONN; 076 077 private static int NUM_REGIONS; 078 079 @FunctionalInterface 080 private interface ScanWithMetrics { 081 Pair<List<Result>, ScanMetrics> scan(Scan scan) throws Exception; 082 } 083 084 private ScanWithMetrics method; 085 086 // methodName is just for naming 087 public TestAsyncTableScanMetrics(String methodName, ScanWithMetrics method) { 088 this.method = method; 089 } 090 091 public static Stream<Arguments> parameters() { 092 ScanWithMetrics doScanWithRawAsyncTable = TestAsyncTableScanMetrics::doScanWithRawAsyncTable; 093 ScanWithMetrics doScanWithAsyncTableScan = TestAsyncTableScanMetrics::doScanWithAsyncTableScan; 094 ScanWithMetrics doScanWithAsyncTableScanner = 095 TestAsyncTableScanMetrics::doScanWithAsyncTableScanner; 096 return Stream.of(Arguments.of("doScanWithRawAsyncTable", doScanWithRawAsyncTable), 097 Arguments.of("doScanWithAsyncTableScan", doScanWithAsyncTableScan), 098 Arguments.of("doScanWithAsyncTableScanner", doScanWithAsyncTableScanner)); 099 } 100 101 @BeforeAll 102 public static void setUp() throws Exception { 103 UTIL.startMiniCluster(3); 104 // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that 105 // scan hits all the region and not all rows lie in a single region 106 try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) { 107 table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE), 108 new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE), 109 new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE))); 110 } 111 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 112 NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size(); 113 } 114 115 @AfterAll 116 public static void tearDown() throws Exception { 117 Closeables.close(CONN, true); 118 UTIL.shutdownMiniCluster(); 119 } 120 121 private static Pair<List<Result>, ScanMetrics> doScanWithRawAsyncTable(Scan scan) 122 throws IOException, InterruptedException { 123 BufferingScanResultConsumer consumer = new BufferingScanResultConsumer(); 124 CONN.getTable(TABLE_NAME).scan(scan, consumer); 125 List<Result> results = new ArrayList<>(); 126 for (Result result; (result = consumer.take()) != null;) { 127 results.add(result); 128 } 129 return Pair.newPair(results, consumer.getScanMetrics()); 130 } 131 132 private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScan(Scan scan) 133 throws Exception { 134 SimpleScanResultConsumerImpl consumer = new SimpleScanResultConsumerImpl(); 135 CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer); 136 return Pair.newPair(consumer.getAll(), consumer.getScanMetrics()); 137 } 138 139 private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScanner(Scan scan) 140 throws IOException { 141 try (ResultScanner scanner = 142 CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan)) { 143 List<Result> results = new ArrayList<>(); 144 for (Result result; (result = scanner.next()) != null;) { 145 results.add(result); 146 } 147 return Pair.newPair(results, scanner.getScanMetrics()); 148 } 149 } 150 151 @TestTemplate 152 public void testScanMetricsDisabled() throws Exception { 153 Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan()); 154 assertEquals(3, pair.getFirst().size()); 155 // Assert no scan metrics 156 assertNull(pair.getSecond()); 157 } 158 159 @TestTemplate 160 public void testScanMetricsWithScanMetricsByRegionDisabled() throws Exception { 161 Scan scan = new Scan(); 162 scan.setScanMetricsEnabled(true); 163 long startNanos = System.nanoTime(); 164 Pair<List<Result>, ScanMetrics> pair = method.scan(scan); 165 long endNanos = System.nanoTime(); 166 List<Result> results = pair.getFirst(); 167 assertEquals(3, results.size()); 168 long bytes = getBytesOfResults(results); 169 ScanMetrics scanMetrics = pair.getSecond(); 170 assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get()); 171 assertEquals(bytes, scanMetrics.countOfBytesInResults.get()); 172 assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get()); 173 // Assert scan metrics have not been collected by region 174 assertTrue(scanMetrics.collectMetricsByRegion().isEmpty()); 175 assertThat(scanMetrics.sumOfMillisSecBetweenNexts.get(), 176 both(greaterThan(0L)).and(lessThan(TimeUnit.NANOSECONDS.toMillis(endNanos - startNanos)))); 177 } 178 179 @TestTemplate 180 public void testScanMetricsByRegionForSingleRegionScan() throws Exception { 181 Scan scan = new Scan(); 182 scan.withStartRow(Bytes.toBytes("zzz1"), true); 183 scan.withStopRow(Bytes.toBytes("zzz1"), true); 184 scan.setEnableScanMetricsByRegion(true); 185 Pair<List<Result>, ScanMetrics> pair = method.scan(scan); 186 List<Result> results = pair.getFirst(); 187 assertEquals(1, results.size()); 188 long bytes = getBytesOfResults(results); 189 ScanMetrics scanMetrics = pair.getSecond(); 190 assertEquals(1, scanMetrics.countOfRegions.get()); 191 assertEquals(bytes, scanMetrics.countOfBytesInResults.get()); 192 assertEquals(1, scanMetrics.countOfRPCcalls.get()); 193 // Assert scan metrics by region were collected for the region scanned 194 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 195 scanMetrics.collectMetricsByRegion(false); 196 assertEquals(1, scanMetricsByRegion.size()); 197 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 198 .entrySet()) { 199 ScanMetricsRegionInfo smri = entry.getKey(); 200 Map<String, Long> metrics = entry.getValue(); 201 assertNotNull(smri.getServerName()); 202 assertNotNull(smri.getEncodedRegionName()); 203 // Assert overall scan metrics and scan metrics by region should be equal as only 1 region 204 // was scanned. 205 assertEquals(scanMetrics.getMetricsMap(false), metrics); 206 } 207 // we only have 1 rpc call so there is no millis 'between nexts' 208 assertEquals(0, scanMetrics.sumOfMillisSecBetweenNexts.get()); 209 } 210 211 @TestTemplate 212 public void testScanMetricsByRegionForMultiRegionScan() throws Exception { 213 Scan scan = new Scan(); 214 scan.setEnableScanMetricsByRegion(true); 215 long startNanos = System.nanoTime(); 216 Pair<List<Result>, ScanMetrics> pair = method.scan(scan); 217 long endNanos = System.nanoTime(); 218 List<Result> results = pair.getFirst(); 219 assertEquals(3, results.size()); 220 long bytes = getBytesOfResults(results); 221 ScanMetrics scanMetrics = pair.getSecond(); 222 Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false); 223 assertEquals(NUM_REGIONS, (long) overallMetrics.get(REGIONS_SCANNED_METRIC_NAME)); 224 assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get()); 225 assertEquals(bytes, (long) overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME)); 226 assertEquals(bytes, scanMetrics.countOfBytesInResults.get()); 227 assertEquals(NUM_REGIONS, (long) overallMetrics.get(RPC_CALLS_METRIC_NAME)); 228 assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get()); 229 // Assert scan metrics by region were collected for the region scanned 230 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 231 scanMetrics.collectMetricsByRegion(false); 232 assertEquals(NUM_REGIONS, scanMetricsByRegion.size()); 233 int rowsScannedAcrossAllRegions = 0; 234 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 235 .entrySet()) { 236 ScanMetricsRegionInfo smri = entry.getKey(); 237 Map<String, Long> perRegionMetrics = entry.getValue(); 238 assertNotNull(smri.getServerName()); 239 assertNotNull(smri.getEncodedRegionName()); 240 assertEquals(1, (long) perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME)); 241 if (perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) { 242 bytes = getBytesOfResults(Collections.singletonList(results.get(0))); 243 assertEquals(bytes, (long) perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME)); 244 rowsScannedAcrossAllRegions++; 245 } else { 246 assertEquals(0, (long) perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 247 assertEquals(0, (long) perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME)); 248 } 249 } 250 assertEquals(3, rowsScannedAcrossAllRegions); 251 assertThat(scanMetrics.sumOfMillisSecBetweenNexts.get(), 252 both(greaterThan(0L)).and(lessThan(TimeUnit.NANOSECONDS.toMillis(endNanos - startNanos)))); 253 } 254 255 static long getBytesOfResults(List<Result> results) { 256 return results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream()) 257 .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum(); 258 } 259}