001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.TestAsyncTableScanMetrics.getBytesOfResults; 021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME; 022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; 023import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME; 024import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; 025import static org.junit.jupiter.api.Assertions.assertEquals; 026import static org.junit.jupiter.api.Assertions.assertNotNull; 027import static org.junit.jupiter.api.Assertions.assertNull; 028import static org.junit.jupiter.api.Assertions.assertTrue; 029 030import java.util.ArrayList; 031import java.util.Arrays; 032import java.util.Collections; 033import java.util.List; 034import java.util.Map; 035import java.util.concurrent.atomic.AtomicInteger; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.Waiter; 039import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 040import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.junit.jupiter.api.AfterAll; 045import org.junit.jupiter.api.BeforeAll; 046import org.junit.jupiter.api.Tag; 047import org.junit.jupiter.api.Test; 048 049import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 050 051@Tag(MediumTests.TAG) 052@Tag(ClientTests.TAG) 053public class TestAsyncTableScanMetricsWithScannerSuspending { 054 055 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 056 057 private static final TableName TABLE_NAME = 058 TableName.valueOf(TestAsyncTableScanMetricsWithScannerSuspending.class.getSimpleName()); 059 060 private static final byte[] CF = Bytes.toBytes("cf"); 061 062 private static final byte[] CQ = Bytes.toBytes("cq"); 063 064 private static final byte[] VALUE = Bytes.toBytes("value"); 065 066 private static AsyncConnection CONN; 067 068 @BeforeAll 069 public static void setUp() throws Exception { 070 UTIL.startMiniCluster(1); 071 // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that 072 // scan hits all the region and not all rows lie in a single region 073 try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) { 074 table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE), 075 new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE), 076 new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE))); 077 } 078 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 079 } 080 081 @AfterAll 082 public static void tearDown() throws Exception { 083 Closeables.close(CONN, true); 084 UTIL.shutdownMiniCluster(); 085 } 086 087 @Test 088 public void testScanMetricsByRegionWithScannerSuspending() throws Exception { 089 // Setup scan 090 Scan scan = new Scan(); 091 scan.withStartRow(Bytes.toBytes("xxx1"), true); 092 scan.withStopRow(Bytes.toBytes("zzz1"), true); 093 scan.setEnableScanMetricsByRegion(true); 094 scan.setMaxResultSize(1); 095 096 // Prepare scanner 097 final AtomicInteger rowsReadCounter = new AtomicInteger(0); 098 AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, scan, 1) { 099 @Override 100 public void onNext(Result[] results, ScanController controller) { 101 rowsReadCounter.addAndGet(results.length); 102 super.onNext(results, controller); 103 } 104 }; 105 106 // Do the scan so that rows get loaded in the scanner (consumer) 107 CONN.getTable(TABLE_NAME).scan(scan, scanner); 108 109 List<Result> results = new ArrayList<>(); 110 int expectedTotalRows = 3; 111 // Assert that only 1 row has been loaded so far as maxCacheSize is set to 1 byte 112 for (int i = 1; i <= expectedTotalRows; i++) { 113 UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { 114 115 @Override 116 public boolean evaluate() throws Exception { 117 return scanner.isSuspended(); 118 } 119 120 @Override 121 public String explainFailure() throws Exception { 122 return "The given scanner has been suspended in time"; 123 } 124 }); 125 assertTrue(scanner.isSuspended()); 126 assertEquals(i, rowsReadCounter.get()); 127 results.add(scanner.next()); 128 } 129 assertNull(scanner.next()); 130 131 // Assert on overall scan metrics and scan metrics by region 132 ScanMetrics scanMetrics = scanner.getScanMetrics(); 133 // Assert on overall scan metrics 134 long bytes = getBytesOfResults(results); 135 Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false); 136 assertEquals(3, (long) overallMetrics.get(REGIONS_SCANNED_METRIC_NAME)); 137 assertEquals(bytes, (long) overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME)); 138 // 1 Extra RPC call per region where no row is returned but moreResultsInRegion is set to false 139 assertEquals(6, (long) overallMetrics.get(RPC_CALLS_METRIC_NAME)); 140 // Assert scan metrics by region were collected for the region scanned 141 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 142 scanMetrics.collectMetricsByRegion(false); 143 assertEquals(3, scanMetricsByRegion.size()); 144 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 145 .entrySet()) { 146 ScanMetricsRegionInfo smri = entry.getKey(); 147 Map<String, Long> perRegionMetrics = entry.getValue(); 148 assertNotNull(smri.getServerName()); 149 assertNotNull(smri.getEncodedRegionName()); 150 assertEquals(1, (long) perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME)); 151 assertEquals(1, (long) perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 152 bytes = getBytesOfResults(Collections.singletonList(results.get(0))); 153 assertEquals(bytes, (long) perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME)); 154 assertEquals(2, (long) perRegionMetrics.get(RPC_CALLS_METRIC_NAME)); 155 } 156 } 157}