001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNull; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.List; 027import java.util.concurrent.ForkJoinPool; 028import org.apache.commons.io.IOUtils; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseTestingUtility; 031import org.apache.hadoop.hbase.PrivateCellUtil; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 034import org.apache.hadoop.hbase.testclassification.ClientTests; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.Pair; 038import org.junit.AfterClass; 039import org.junit.BeforeClass; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.junit.runner.RunWith; 044import org.junit.runners.Parameterized; 045import org.junit.runners.Parameterized.Parameter; 046import org.junit.runners.Parameterized.Parameters; 047 048@RunWith(Parameterized.class) 049@Category({ MediumTests.class, ClientTests.class }) 050public class TestAsyncTableScanMetrics { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestAsyncTableScanMetrics.class); 055 056 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 057 058 private static final TableName TABLE_NAME = TableName.valueOf("ScanMetrics"); 059 060 private static final byte[] CF = Bytes.toBytes("cf"); 061 062 private static final byte[] CQ = Bytes.toBytes("cq"); 063 064 private static final byte[] VALUE = Bytes.toBytes("value"); 065 066 private static AsyncConnection CONN; 067 068 private static int NUM_REGIONS; 069 070 @FunctionalInterface 071 private interface ScanWithMetrics { 072 Pair<List<Result>, ScanMetrics> scan(Scan scan) throws Exception; 073 } 074 075 @Parameter(0) 076 public String methodName; 077 078 @Parameter(1) 079 public ScanWithMetrics method; 080 081 @Parameters(name = "{index}: scan={0}") 082 public static List<Object[]> params() { 083 ScanWithMetrics doScanWithRawAsyncTable = TestAsyncTableScanMetrics::doScanWithRawAsyncTable; 084 ScanWithMetrics doScanWithAsyncTableScan = TestAsyncTableScanMetrics::doScanWithAsyncTableScan; 085 ScanWithMetrics doScanWithAsyncTableScanner = 086 TestAsyncTableScanMetrics::doScanWithAsyncTableScanner; 087 return Arrays.asList(new Object[] { "doScanWithRawAsyncTable", doScanWithRawAsyncTable }, 088 new Object[] { "doScanWithAsyncTableScan", doScanWithAsyncTableScan }, 089 new Object[] { "doScanWithAsyncTableScanner", doScanWithAsyncTableScanner }); 090 } 091 092 @BeforeClass 093 public static void setUp() throws Exception { 094 UTIL.startMiniCluster(3); 095 // Create 3 rows in the table, with rowkeys starting with "zzz*" so that 096 // scan are forced to hit all the regions. 097 try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) { 098 table.put(Arrays.asList(new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE), 099 new Put(Bytes.toBytes("zzz2")).addColumn(CF, CQ, VALUE), 100 new Put(Bytes.toBytes("zzz3")).addColumn(CF, CQ, VALUE))); 101 } 102 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 103 NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size(); 104 } 105 106 @AfterClass 107 public static void tearDown() throws Exception { 108 IOUtils.closeQuietly(CONN); 109 UTIL.shutdownMiniCluster(); 110 } 111 112 private static Pair<List<Result>, ScanMetrics> doScanWithRawAsyncTable(Scan scan) 113 throws IOException, InterruptedException { 114 BufferingScanResultConsumer consumer = new BufferingScanResultConsumer(); 115 CONN.getTable(TABLE_NAME).scan(scan, consumer); 116 List<Result> results = new ArrayList<>(); 117 for (Result result; (result = consumer.take()) != null;) { 118 results.add(result); 119 } 120 return Pair.newPair(results, consumer.getScanMetrics()); 121 } 122 123 private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScan(Scan scan) 124 throws Exception { 125 SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); 126 CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer); 127 return Pair.newPair(consumer.getAll(), consumer.getScanMetrics()); 128 } 129 130 private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScanner(Scan scan) 131 throws IOException { 132 try (ResultScanner scanner = 133 CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan)) { 134 List<Result> results = new ArrayList<>(); 135 for (Result result; (result = scanner.next()) != null;) { 136 results.add(result); 137 } 138 return Pair.newPair(results, scanner.getScanMetrics()); 139 } 140 } 141 142 @Test 143 public void testNoScanMetrics() throws Exception { 144 Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan()); 145 assertEquals(3, pair.getFirst().size()); 146 assertNull(pair.getSecond()); 147 } 148 149 @Test 150 public void testScanMetrics() throws Exception { 151 Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan().setScanMetricsEnabled(true)); 152 List<Result> results = pair.getFirst(); 153 assertEquals(3, results.size()); 154 long bytes = results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream()) 155 .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum(); 156 ScanMetrics scanMetrics = pair.getSecond(); 157 assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get()); 158 assertEquals(bytes, scanMetrics.countOfBytesInResults.get()); 159 assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get()); 160 // also assert a server side metric to ensure that we have published them into the client side 161 // metrics. 162 assertEquals(3, scanMetrics.countOfRowsScanned.get()); 163 } 164}