001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME; 021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; 022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; 023import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertTrue; 028 029import java.io.IOException; 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Collections; 033import java.util.List; 034import java.util.Map; 035import java.util.concurrent.ForkJoinPool; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.PrivateCellUtil; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 041import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; 042import org.apache.hadoop.hbase.testclassification.ClientTests; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.util.Bytes; 045import org.apache.hadoop.hbase.util.Pair; 046import org.junit.AfterClass; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.junit.runner.RunWith; 052import org.junit.runners.Parameterized; 053import org.junit.runners.Parameterized.Parameter; 054import org.junit.runners.Parameterized.Parameters; 055 056import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 057 058@RunWith(Parameterized.class) 059@Category({ MediumTests.class, ClientTests.class }) 060public class TestAsyncTableScanMetrics { 061 062 @ClassRule 063 public static final HBaseClassTestRule CLASS_RULE = 064 HBaseClassTestRule.forClass(TestAsyncTableScanMetrics.class); 065 066 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 067 068 private static final TableName TABLE_NAME = TableName.valueOf("ScanMetrics"); 069 070 private static final byte[] CF = Bytes.toBytes("cf"); 071 072 private static final byte[] CQ = Bytes.toBytes("cq"); 073 074 private static final byte[] VALUE = Bytes.toBytes("value"); 075 076 private static AsyncConnection CONN; 077 078 private static int NUM_REGIONS; 079 080 @FunctionalInterface 081 private interface ScanWithMetrics { 082 Pair<List<Result>, ScanMetrics> scan(Scan scan) throws Exception; 083 } 084 085 @Parameter(0) 086 public String methodName; 087 088 @Parameter(1) 089 public ScanWithMetrics method; 090 091 @Parameters(name = "{index}: scan={0}") 092 public static List<Object[]> params() { 093 ScanWithMetrics doScanWithRawAsyncTable = TestAsyncTableScanMetrics::doScanWithRawAsyncTable; 094 ScanWithMetrics doScanWithAsyncTableScan = TestAsyncTableScanMetrics::doScanWithAsyncTableScan; 095 ScanWithMetrics doScanWithAsyncTableScanner = 096 TestAsyncTableScanMetrics::doScanWithAsyncTableScanner; 097 return Arrays.asList(new Object[] { "doScanWithRawAsyncTable", doScanWithRawAsyncTable }, 098 new Object[] { "doScanWithAsyncTableScan", doScanWithAsyncTableScan }, 099 new Object[] { "doScanWithAsyncTableScanner", doScanWithAsyncTableScanner }); 100 } 101 102 @BeforeClass 103 public static void setUp() throws Exception { 104 UTIL.startMiniCluster(3); 105 // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that 106 // scan hits all the region and not all rows lie in a single region 107 try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) { 108 table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE), 109 new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE), 110 new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE))); 111 } 112 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 113 NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size(); 114 } 115 116 @AfterClass 117 public static void tearDown() throws Exception { 118 Closeables.close(CONN, true); 119 UTIL.shutdownMiniCluster(); 120 } 121 122 private static Pair<List<Result>, ScanMetrics> doScanWithRawAsyncTable(Scan scan) 123 throws IOException, InterruptedException { 124 BufferingScanResultConsumer consumer = new BufferingScanResultConsumer(); 125 CONN.getTable(TABLE_NAME).scan(scan, consumer); 126 List<Result> results = new ArrayList<>(); 127 for (Result result; (result = consumer.take()) != null;) { 128 results.add(result); 129 } 130 return Pair.newPair(results, consumer.getScanMetrics()); 131 } 132 133 private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScan(Scan scan) 134 throws Exception { 135 SimpleScanResultConsumerImpl consumer = new SimpleScanResultConsumerImpl(); 136 CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer); 137 return Pair.newPair(consumer.getAll(), consumer.getScanMetrics()); 138 } 139 140 private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScanner(Scan scan) 141 throws IOException { 142 try (ResultScanner scanner = 143 CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan)) { 144 List<Result> results = new ArrayList<>(); 145 for (Result result; (result = scanner.next()) != null;) { 146 results.add(result); 147 } 148 return Pair.newPair(results, scanner.getScanMetrics()); 149 } 150 } 151 152 @Test 153 public void testScanMetricsDisabled() throws Exception { 154 Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan()); 155 assertEquals(3, pair.getFirst().size()); 156 // Assert no scan metrics 157 assertNull(pair.getSecond()); 158 } 159 160 @Test 161 public void testScanMetricsWithScanMetricsByRegionDisabled() throws Exception { 162 Scan scan = new Scan(); 163 scan.setScanMetricsEnabled(true); 164 Pair<List<Result>, ScanMetrics> pair = method.scan(scan); 165 List<Result> results = pair.getFirst(); 166 assertEquals(3, results.size()); 167 long bytes = getBytesOfResults(results); 168 ScanMetrics scanMetrics = pair.getSecond(); 169 assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get()); 170 assertEquals(bytes, scanMetrics.countOfBytesInResults.get()); 171 assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get()); 172 // Assert scan metrics have not been collected by region 173 assertTrue(scanMetrics.collectMetricsByRegion().isEmpty()); 174 } 175 176 @Test 177 public void testScanMetricsByRegionForSingleRegionScan() throws Exception { 178 Scan scan = new Scan(); 179 scan.withStartRow(Bytes.toBytes("zzz1"), true); 180 scan.withStopRow(Bytes.toBytes("zzz1"), true); 181 scan.setEnableScanMetricsByRegion(true); 182 Pair<List<Result>, ScanMetrics> pair = method.scan(scan); 183 List<Result> results = pair.getFirst(); 184 assertEquals(1, results.size()); 185 long bytes = getBytesOfResults(results); 186 ScanMetrics scanMetrics = pair.getSecond(); 187 assertEquals(1, scanMetrics.countOfRegions.get()); 188 assertEquals(bytes, scanMetrics.countOfBytesInResults.get()); 189 assertEquals(1, scanMetrics.countOfRPCcalls.get()); 190 // Assert scan metrics by region were collected for the region scanned 191 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 192 scanMetrics.collectMetricsByRegion(false); 193 assertEquals(1, scanMetricsByRegion.size()); 194 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 195 .entrySet()) { 196 ScanMetricsRegionInfo smri = entry.getKey(); 197 Map<String, Long> metrics = entry.getValue(); 198 assertNotNull(smri.getServerName()); 199 assertNotNull(smri.getEncodedRegionName()); 200 // Assert overall scan metrics and scan metrics by region should be equal as only 1 region 201 // was scanned. 202 assertEquals(scanMetrics.getMetricsMap(false), metrics); 203 } 204 } 205 206 @Test 207 public void testScanMetricsByRegionForMultiRegionScan() throws Exception { 208 Scan scan = new Scan(); 209 scan.setEnableScanMetricsByRegion(true); 210 Pair<List<Result>, ScanMetrics> pair = method.scan(scan); 211 List<Result> results = pair.getFirst(); 212 assertEquals(3, results.size()); 213 long bytes = getBytesOfResults(results); 214 ScanMetrics scanMetrics = pair.getSecond(); 215 Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false); 216 assertEquals(NUM_REGIONS, (long) overallMetrics.get(REGIONS_SCANNED_METRIC_NAME)); 217 assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get()); 218 assertEquals(bytes, (long) overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME)); 219 assertEquals(bytes, scanMetrics.countOfBytesInResults.get()); 220 assertEquals(NUM_REGIONS, (long) overallMetrics.get(RPC_CALLS_METRIC_NAME)); 221 assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get()); 222 // Assert scan metrics by region were collected for the region scanned 223 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 224 scanMetrics.collectMetricsByRegion(false); 225 assertEquals(NUM_REGIONS, scanMetricsByRegion.size()); 226 int rowsScannedAcrossAllRegions = 0; 227 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 228 .entrySet()) { 229 ScanMetricsRegionInfo smri = entry.getKey(); 230 Map<String, Long> perRegionMetrics = entry.getValue(); 231 assertNotNull(smri.getServerName()); 232 assertNotNull(smri.getEncodedRegionName()); 233 assertEquals(1, (long) perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME)); 234 if (perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) { 235 bytes = getBytesOfResults(Collections.singletonList(results.get(0))); 236 assertEquals(bytes, (long) perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME)); 237 rowsScannedAcrossAllRegions++; 238 } else { 239 assertEquals(0, (long) perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 240 assertEquals(0, (long) perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME)); 241 } 242 } 243 assertEquals(3, rowsScannedAcrossAllRegions); 244 } 245 246 static long getBytesOfResults(List<Result> results) { 247 return results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream()) 248 .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum(); 249 } 250}