001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.TestAsyncTableScanMetrics.getBytesOfResults; 021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME; 022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; 023import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME; 024import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; 025import static org.junit.Assert.assertEquals; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertTrue; 028 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.List; 033import java.util.Map; 034import java.util.concurrent.atomic.AtomicInteger; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.Waiter; 039import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 040import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; 041import org.apache.hadoop.hbase.testclassification.ClientTests; 042import org.apache.hadoop.hbase.testclassification.MediumTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.junit.AfterClass; 045import org.junit.Assert; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 052 053@Category({ MediumTests.class, ClientTests.class }) 054public class TestAsyncTableScanMetricsWithScannerSuspending { 055 056 @ClassRule 057 public static final HBaseClassTestRule CLASS_RULE = 058 HBaseClassTestRule.forClass(TestAsyncTableScanMetricsWithScannerSuspending.class); 059 060 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 061 062 private static final TableName TABLE_NAME = 063 TableName.valueOf(TestAsyncTableScanMetricsWithScannerSuspending.class.getSimpleName()); 064 065 private static final byte[] CF = Bytes.toBytes("cf"); 066 067 private static final byte[] CQ = Bytes.toBytes("cq"); 068 069 private static final byte[] VALUE = Bytes.toBytes("value"); 070 071 private static AsyncConnection CONN; 072 073 @BeforeClass 074 public static void setUp() throws Exception { 075 UTIL.startMiniCluster(1); 076 // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that 077 // scan hits all the region and not all rows lie in a single region 078 try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) { 079 table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE), 080 new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE), 081 new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE))); 082 } 083 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 084 } 085 086 @AfterClass 087 public static void tearDown() throws Exception { 088 Closeables.close(CONN, true); 089 UTIL.shutdownMiniCluster(); 090 } 091 092 @Test 093 public void testScanMetricsByRegionWithScannerSuspending() throws Exception { 094 // Setup scan 095 Scan scan = new Scan(); 096 scan.withStartRow(Bytes.toBytes("xxx1"), true); 097 scan.withStopRow(Bytes.toBytes("zzz1"), true); 098 scan.setEnableScanMetricsByRegion(true); 099 scan.setMaxResultSize(1); 100 101 // Prepare scanner 102 final AtomicInteger rowsReadCounter = new AtomicInteger(0); 103 AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, scan, 1) { 104 @Override 105 public void onNext(Result[] results, ScanController controller) { 106 rowsReadCounter.addAndGet(results.length); 107 super.onNext(results, controller); 108 } 109 }; 110 111 // Do the scan so that rows get loaded in the scanner (consumer) 112 CONN.getTable(TABLE_NAME).scan(scan, scanner); 113 114 List<Result> results = new ArrayList<>(); 115 int expectedTotalRows = 3; 116 // Assert that only 1 row has been loaded so far as maxCacheSize is set to 1 byte 117 for (int i = 1; i <= expectedTotalRows; i++) { 118 UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { 119 120 @Override 121 public boolean evaluate() throws Exception { 122 return scanner.isSuspended(); 123 } 124 125 @Override 126 public String explainFailure() throws Exception { 127 return "The given scanner has been suspended in time"; 128 } 129 }); 130 assertTrue(scanner.isSuspended()); 131 assertEquals(i, rowsReadCounter.get()); 132 results.add(scanner.next()); 133 } 134 Assert.assertNull(scanner.next()); 135 136 // Assert on overall scan metrics and scan metrics by region 137 ScanMetrics scanMetrics = scanner.getScanMetrics(); 138 // Assert on overall scan metrics 139 long bytes = getBytesOfResults(results); 140 Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false); 141 assertEquals(3, (long) overallMetrics.get(REGIONS_SCANNED_METRIC_NAME)); 142 assertEquals(bytes, (long) overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME)); 143 // 1 Extra RPC call per region where no row is returned but moreResultsInRegion is set to false 144 assertEquals(6, (long) overallMetrics.get(RPC_CALLS_METRIC_NAME)); 145 // Assert scan metrics by region were collected for the region scanned 146 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 147 scanMetrics.collectMetricsByRegion(false); 148 assertEquals(3, scanMetricsByRegion.size()); 149 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 150 .entrySet()) { 151 ScanMetricsRegionInfo smri = entry.getKey(); 152 Map<String, Long> perRegionMetrics = entry.getValue(); 153 assertNotNull(smri.getServerName()); 154 assertNotNull(smri.getEncodedRegionName()); 155 assertEquals(1, (long) perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME)); 156 assertEquals(1, (long) perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 157 bytes = getBytesOfResults(Collections.singletonList(results.get(0))); 158 assertEquals(bytes, (long) perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME)); 159 assertEquals(2, (long) perRegionMetrics.get(RPC_CALLS_METRIC_NAME)); 160 } 161 } 162}