001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
023import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertTrue;
028
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Collections;
033import java.util.List;
034import java.util.Map;
035import java.util.concurrent.ForkJoinPool;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.PrivateCellUtil;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
041import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
042import org.apache.hadoop.hbase.testclassification.ClientTests;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.Pair;
046import org.junit.AfterClass;
047import org.junit.BeforeClass;
048import org.junit.ClassRule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.runner.RunWith;
052import org.junit.runners.Parameterized;
053import org.junit.runners.Parameterized.Parameter;
054import org.junit.runners.Parameterized.Parameters;
055
056import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
057
058@RunWith(Parameterized.class)
059@Category({ MediumTests.class, ClientTests.class })
060public class TestAsyncTableScanMetrics {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestAsyncTableScanMetrics.class);
065
066  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
067
068  private static final TableName TABLE_NAME = TableName.valueOf("ScanMetrics");
069
070  private static final byte[] CF = Bytes.toBytes("cf");
071
072  private static final byte[] CQ = Bytes.toBytes("cq");
073
074  private static final byte[] VALUE = Bytes.toBytes("value");
075
076  private static AsyncConnection CONN;
077
078  private static int NUM_REGIONS;
079
080  @FunctionalInterface
081  private interface ScanWithMetrics {
082    Pair<List<Result>, ScanMetrics> scan(Scan scan) throws Exception;
083  }
084
085  @Parameter(0)
086  public String methodName;
087
088  @Parameter(1)
089  public ScanWithMetrics method;
090
091  @Parameters(name = "{index}: scan={0}")
092  public static List<Object[]> params() {
093    ScanWithMetrics doScanWithRawAsyncTable = TestAsyncTableScanMetrics::doScanWithRawAsyncTable;
094    ScanWithMetrics doScanWithAsyncTableScan = TestAsyncTableScanMetrics::doScanWithAsyncTableScan;
095    ScanWithMetrics doScanWithAsyncTableScanner =
096      TestAsyncTableScanMetrics::doScanWithAsyncTableScanner;
097    return Arrays.asList(new Object[] { "doScanWithRawAsyncTable", doScanWithRawAsyncTable },
098      new Object[] { "doScanWithAsyncTableScan", doScanWithAsyncTableScan },
099      new Object[] { "doScanWithAsyncTableScanner", doScanWithAsyncTableScanner });
100  }
101
102  @BeforeClass
103  public static void setUp() throws Exception {
104    UTIL.startMiniCluster(3);
105    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that
106    // scan hits all the region and not all rows lie in a single region
107    try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
108      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE),
109        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
110        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
111    }
112    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
113    NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
114  }
115
116  @AfterClass
117  public static void tearDown() throws Exception {
118    Closeables.close(CONN, true);
119    UTIL.shutdownMiniCluster();
120  }
121
122  private static Pair<List<Result>, ScanMetrics> doScanWithRawAsyncTable(Scan scan)
123    throws IOException, InterruptedException {
124    BufferingScanResultConsumer consumer = new BufferingScanResultConsumer();
125    CONN.getTable(TABLE_NAME).scan(scan, consumer);
126    List<Result> results = new ArrayList<>();
127    for (Result result; (result = consumer.take()) != null;) {
128      results.add(result);
129    }
130    return Pair.newPair(results, consumer.getScanMetrics());
131  }
132
133  private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScan(Scan scan)
134    throws Exception {
135    SimpleScanResultConsumerImpl consumer = new SimpleScanResultConsumerImpl();
136    CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer);
137    return Pair.newPair(consumer.getAll(), consumer.getScanMetrics());
138  }
139
140  private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScanner(Scan scan)
141    throws IOException {
142    try (ResultScanner scanner =
143      CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan)) {
144      List<Result> results = new ArrayList<>();
145      for (Result result; (result = scanner.next()) != null;) {
146        results.add(result);
147      }
148      return Pair.newPair(results, scanner.getScanMetrics());
149    }
150  }
151
152  @Test
153  public void testScanMetricsDisabled() throws Exception {
154    Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
155    assertEquals(3, pair.getFirst().size());
156    // Assert no scan metrics
157    assertNull(pair.getSecond());
158  }
159
160  @Test
161  public void testScanMetricsWithScanMetricsByRegionDisabled() throws Exception {
162    Scan scan = new Scan();
163    scan.setScanMetricsEnabled(true);
164    Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
165    List<Result> results = pair.getFirst();
166    assertEquals(3, results.size());
167    long bytes = getBytesOfResults(results);
168    ScanMetrics scanMetrics = pair.getSecond();
169    assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
170    assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
171    assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
172    // Assert scan metrics have not been collected by region
173    assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
174  }
175
176  @Test
177  public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
178    Scan scan = new Scan();
179    scan.withStartRow(Bytes.toBytes("zzz1"), true);
180    scan.withStopRow(Bytes.toBytes("zzz1"), true);
181    scan.setEnableScanMetricsByRegion(true);
182    Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
183    List<Result> results = pair.getFirst();
184    assertEquals(1, results.size());
185    long bytes = getBytesOfResults(results);
186    ScanMetrics scanMetrics = pair.getSecond();
187    assertEquals(1, scanMetrics.countOfRegions.get());
188    assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
189    assertEquals(1, scanMetrics.countOfRPCcalls.get());
190    // Assert scan metrics by region were collected for the region scanned
191    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
192      scanMetrics.collectMetricsByRegion(false);
193    assertEquals(1, scanMetricsByRegion.size());
194    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
195      .entrySet()) {
196      ScanMetricsRegionInfo smri = entry.getKey();
197      Map<String, Long> metrics = entry.getValue();
198      assertNotNull(smri.getServerName());
199      assertNotNull(smri.getEncodedRegionName());
200      // Assert overall scan metrics and scan metrics by region should be equal as only 1 region
201      // was scanned.
202      assertEquals(scanMetrics.getMetricsMap(false), metrics);
203    }
204  }
205
206  @Test
207  public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
208    Scan scan = new Scan();
209    scan.setEnableScanMetricsByRegion(true);
210    Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
211    List<Result> results = pair.getFirst();
212    assertEquals(3, results.size());
213    long bytes = getBytesOfResults(results);
214    ScanMetrics scanMetrics = pair.getSecond();
215    Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false);
216    assertEquals(NUM_REGIONS, (long) overallMetrics.get(REGIONS_SCANNED_METRIC_NAME));
217    assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
218    assertEquals(bytes, (long) overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
219    assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
220    assertEquals(NUM_REGIONS, (long) overallMetrics.get(RPC_CALLS_METRIC_NAME));
221    assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
222    // Assert scan metrics by region were collected for the region scanned
223    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
224      scanMetrics.collectMetricsByRegion(false);
225    assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
226    int rowsScannedAcrossAllRegions = 0;
227    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
228      .entrySet()) {
229      ScanMetricsRegionInfo smri = entry.getKey();
230      Map<String, Long> perRegionMetrics = entry.getValue();
231      assertNotNull(smri.getServerName());
232      assertNotNull(smri.getEncodedRegionName());
233      assertEquals(1, (long) perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME));
234      if (perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
235        bytes = getBytesOfResults(Collections.singletonList(results.get(0)));
236        assertEquals(bytes, (long) perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
237        rowsScannedAcrossAllRegions++;
238      } else {
239        assertEquals(0, (long) perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
240        assertEquals(0, (long) perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
241      }
242    }
243    assertEquals(3, rowsScannedAcrossAllRegions);
244  }
245
246  static long getBytesOfResults(List<Result> results) {
247    return results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream())
248      .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum();
249  }
250}