001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertNull; 023 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.List; 027import java.util.concurrent.CompletableFuture; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; 031import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource; 032import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceImpl; 033import org.apache.hadoop.hbase.testclassification.ClientTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.JVMClusterUtil; 037import org.junit.jupiter.api.AfterAll; 038import org.junit.jupiter.api.BeforeAll; 039import org.junit.jupiter.api.Tag; 040import org.junit.jupiter.api.Test; 041 042import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 043 044@Tag(MediumTests.TAG) 045@Tag(ClientTests.TAG) 046public class TestAsyncTableQueryMetrics { 047 048 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 049 050 private static final TableName TABLE_NAME = TableName.valueOf("ResultMetrics"); 051 052 private static final byte[] CF = Bytes.toBytes("cf"); 053 054 private static final byte[] CQ = Bytes.toBytes("cq"); 055 056 private static final byte[] VALUE = Bytes.toBytes("value"); 057 058 private static final byte[] ROW_1 = Bytes.toBytes("zzz1"); 059 private static final byte[] ROW_2 = Bytes.toBytes("zzz2"); 060 private static final byte[] ROW_3 = Bytes.toBytes("zzz3"); 061 062 private static AsyncConnection CONN; 063 064 @BeforeAll 065 public static void setUp() throws Exception { 066 UTIL.startMiniCluster(3); 067 // Create 3 rows in the table, with rowkeys starting with "zzz*" so that 068 // scan are forced to hit all the regions. 069 try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) { 070 table.put(Arrays.asList(new Put(ROW_1).addColumn(CF, CQ, VALUE), 071 new Put(ROW_2).addColumn(CF, CQ, VALUE), new Put(ROW_3).addColumn(CF, CQ, VALUE))); 072 } 073 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 074 CONN.getAdmin().flush(TABLE_NAME).join(); 075 } 076 077 @AfterAll 078 public static void tearDown() throws Exception { 079 Closeables.close(CONN, true); 080 UTIL.shutdownMiniCluster(); 081 } 082 083 @Test 084 public void itTestsGets() throws Exception { 085 // Test a single Get 086 Get g1 = new Get(ROW_1); 087 g1.setQueryMetricsEnabled(true); 088 089 long bbs = getClusterBlockBytesScanned(); 090 Result result = CONN.getTable(TABLE_NAME).get(g1).get(); 091 bbs += result.getMetrics().getBlockBytesScanned(); 092 assertNotNull(result.getMetrics()); 093 assertEquals(getClusterBlockBytesScanned(), bbs); 094 095 // Test multigets 096 Get g2 = new Get(ROW_2); 097 g2.setQueryMetricsEnabled(true); 098 099 Get g3 = new Get(ROW_3); 100 g3.setQueryMetricsEnabled(true); 101 102 List<CompletableFuture<Result>> futures = CONN.getTable(TABLE_NAME).get(List.of(g1, g2, g3)); 103 104 for (CompletableFuture<Result> future : futures) { 105 result = future.join(); 106 assertNotNull(result.getMetrics()); 107 bbs += result.getMetrics().getBlockBytesScanned(); 108 } 109 110 assertEquals(getClusterBlockBytesScanned(), bbs); 111 } 112 113 @Test 114 public void itTestsDefaultGetNoMetrics() throws Exception { 115 // Test a single Get 116 Get g1 = new Get(ROW_1); 117 118 Result result = CONN.getTable(TABLE_NAME).get(g1).get(); 119 assertNull(result.getMetrics()); 120 121 // Test multigets 122 Get g2 = new Get(ROW_2); 123 Get g3 = new Get(ROW_3); 124 List<CompletableFuture<Result>> futures = CONN.getTable(TABLE_NAME).get(List.of(g1, g2, g3)); 125 futures.forEach(f -> assertNull(f.join().getMetrics())); 126 127 } 128 129 @Test 130 public void itTestsScans() { 131 Scan scan = new Scan(); 132 scan.setQueryMetricsEnabled(true); 133 134 long bbs = getClusterBlockBytesScanned(); 135 try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(scan)) { 136 for (Result result : scanner) { 137 assertNotNull(result.getMetrics()); 138 bbs += result.getMetrics().getBlockBytesScanned(); 139 assertEquals(getClusterBlockBytesScanned(), bbs); 140 } 141 } 142 } 143 144 @Test 145 public void itTestsDefaultScanNoMetrics() { 146 Scan scan = new Scan(); 147 148 try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(scan)) { 149 for (Result result : scanner) { 150 assertNull(result.getMetrics()); 151 } 152 } 153 } 154 155 @Test 156 public void itTestsAtomicOperations() { 157 CheckAndMutate cam = CheckAndMutate.newBuilder(ROW_1).ifEquals(CF, CQ, VALUE) 158 .queryMetricsEnabled(true).build(new Put(ROW_1).addColumn(CF, CQ, VALUE)); 159 160 long bbs = getClusterBlockBytesScanned(); 161 CheckAndMutateResult result = CONN.getTable(TABLE_NAME).checkAndMutate(cam).join(); 162 QueryMetrics metrics = result.getMetrics(); 163 164 assertNotNull(metrics); 165 assertEquals(getClusterBlockBytesScanned(), bbs + metrics.getBlockBytesScanned()); 166 167 bbs = getClusterBlockBytesScanned(); 168 List<CheckAndMutate> batch = new ArrayList<>(); 169 batch.add(cam); 170 batch.add(CheckAndMutate.newBuilder(ROW_2).queryMetricsEnabled(true).ifEquals(CF, CQ, VALUE) 171 .build(new Put(ROW_2).addColumn(CF, CQ, VALUE))); 172 batch.add(CheckAndMutate.newBuilder(ROW_3).queryMetricsEnabled(true).ifEquals(CF, CQ, VALUE) 173 .build(new Put(ROW_3).addColumn(CF, CQ, VALUE))); 174 175 List<Object> res = CONN.getTable(TABLE_NAME).batchAll(batch).join(); 176 long totalBbs = res.stream() 177 .mapToLong(r -> ((CheckAndMutateResult) r).getMetrics().getBlockBytesScanned()).sum(); 178 assertEquals(getClusterBlockBytesScanned(), bbs + totalBbs); 179 180 bbs = getClusterBlockBytesScanned(); 181 182 // flush to force fetch from disk 183 CONN.getAdmin().flush(TABLE_NAME).join(); 184 List<CompletableFuture<Object>> futures = CONN.getTable(TABLE_NAME).batch(batch); 185 186 totalBbs = futures.stream().map(CompletableFuture::join) 187 .mapToLong(r -> ((CheckAndMutateResult) r).getMetrics().getBlockBytesScanned()).sum(); 188 assertEquals(getClusterBlockBytesScanned(), bbs + totalBbs); 189 } 190 191 @Test 192 public void itTestsDefaultAtomicOperations() { 193 CheckAndMutate cam = CheckAndMutate.newBuilder(ROW_1).ifEquals(CF, CQ, VALUE) 194 .build(new Put(ROW_1).addColumn(CF, CQ, VALUE)); 195 196 CheckAndMutateResult result = CONN.getTable(TABLE_NAME).checkAndMutate(cam).join(); 197 QueryMetrics metrics = result.getMetrics(); 198 199 assertNull(metrics); 200 201 List<CheckAndMutate> batch = new ArrayList<>(); 202 batch.add(cam); 203 batch.add(CheckAndMutate.newBuilder(ROW_2).ifEquals(CF, CQ, VALUE) 204 .build(new Put(ROW_2).addColumn(CF, CQ, VALUE))); 205 batch.add(CheckAndMutate.newBuilder(ROW_3).ifEquals(CF, CQ, VALUE) 206 .build(new Put(ROW_3).addColumn(CF, CQ, VALUE))); 207 208 List<Object> res = CONN.getTable(TABLE_NAME).batchAll(batch).join(); 209 for (Object r : res) { 210 assertNull(((CheckAndMutateResult) r).getMetrics()); 211 } 212 213 // flush to force fetch from disk 214 CONN.getAdmin().flush(TABLE_NAME).join(); 215 List<CompletableFuture<Object>> futures = CONN.getTable(TABLE_NAME).batch(batch); 216 217 for (CompletableFuture<Object> future : futures) { 218 Object r = future.join(); 219 assertNull(((CheckAndMutateResult) r).getMetrics()); 220 } 221 } 222 223 private static long getClusterBlockBytesScanned() { 224 long bbs = 0L; 225 226 for (JVMClusterUtil.RegionServerThread rs : UTIL.getHBaseCluster().getRegionServerThreads()) { 227 MetricsRegionServer metrics = rs.getRegionServer().getMetrics(); 228 MetricsRegionServerSourceImpl source = 229 (MetricsRegionServerSourceImpl) metrics.getMetricsSource(); 230 231 bbs += source.getMetricsRegistry() 232 .getCounter(MetricsRegionServerSource.BLOCK_BYTES_SCANNED_KEY, 0L).value(); 233 } 234 235 return bbs; 236 } 237}