001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
023import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
024import static org.hamcrest.MatcherAssert.assertThat;
025import static org.hamcrest.Matchers.both;
026import static org.hamcrest.Matchers.greaterThan;
027import static org.hamcrest.Matchers.lessThan;
028import static org.junit.jupiter.api.Assertions.assertEquals;
029import static org.junit.jupiter.api.Assertions.assertNotNull;
030import static org.junit.jupiter.api.Assertions.assertNull;
031import static org.junit.jupiter.api.Assertions.assertTrue;
032
033import java.io.IOException;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collections;
037import java.util.List;
038import java.util.Map;
039import java.util.concurrent.ForkJoinPool;
040import java.util.concurrent.TimeUnit;
041import java.util.stream.Stream;
042import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.PrivateCellUtil;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
047import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
048import org.apache.hadoop.hbase.testclassification.ClientTests;
049import org.apache.hadoop.hbase.testclassification.MediumTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.Pair;
052import org.junit.jupiter.api.AfterAll;
053import org.junit.jupiter.api.BeforeAll;
054import org.junit.jupiter.api.Tag;
055import org.junit.jupiter.api.TestTemplate;
056import org.junit.jupiter.params.provider.Arguments;
057
058import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
059
060@Tag(MediumTests.TAG)
061@Tag(ClientTests.TAG)
062@HBaseParameterizedTestTemplate(name = "{index}: scan={0}")
063public class TestAsyncTableScanMetrics {
064
065  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
066
067  private static final TableName TABLE_NAME = TableName.valueOf("ScanMetrics");
068
069  private static final byte[] CF = Bytes.toBytes("cf");
070
071  private static final byte[] CQ = Bytes.toBytes("cq");
072
073  private static final byte[] VALUE = Bytes.toBytes("value");
074
075  private static AsyncConnection CONN;
076
077  private static int NUM_REGIONS;
078
079  @FunctionalInterface
080  private interface ScanWithMetrics {
081    Pair<List<Result>, ScanMetrics> scan(Scan scan) throws Exception;
082  }
083
084  private ScanWithMetrics method;
085
086  // methodName is just for naming
087  public TestAsyncTableScanMetrics(String methodName, ScanWithMetrics method) {
088    this.method = method;
089  }
090
091  public static Stream<Arguments> parameters() {
092    ScanWithMetrics doScanWithRawAsyncTable = TestAsyncTableScanMetrics::doScanWithRawAsyncTable;
093    ScanWithMetrics doScanWithAsyncTableScan = TestAsyncTableScanMetrics::doScanWithAsyncTableScan;
094    ScanWithMetrics doScanWithAsyncTableScanner =
095      TestAsyncTableScanMetrics::doScanWithAsyncTableScanner;
096    return Stream.of(Arguments.of("doScanWithRawAsyncTable", doScanWithRawAsyncTable),
097      Arguments.of("doScanWithAsyncTableScan", doScanWithAsyncTableScan),
098      Arguments.of("doScanWithAsyncTableScanner", doScanWithAsyncTableScanner));
099  }
100
101  @BeforeAll
102  public static void setUp() throws Exception {
103    UTIL.startMiniCluster(3);
104    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that
105    // scan hits all the region and not all rows lie in a single region
106    try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
107      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE),
108        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
109        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
110    }
111    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
112    NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
113  }
114
115  @AfterAll
116  public static void tearDown() throws Exception {
117    Closeables.close(CONN, true);
118    UTIL.shutdownMiniCluster();
119  }
120
121  private static Pair<List<Result>, ScanMetrics> doScanWithRawAsyncTable(Scan scan)
122    throws IOException, InterruptedException {
123    BufferingScanResultConsumer consumer = new BufferingScanResultConsumer();
124    CONN.getTable(TABLE_NAME).scan(scan, consumer);
125    List<Result> results = new ArrayList<>();
126    for (Result result; (result = consumer.take()) != null;) {
127      results.add(result);
128    }
129    return Pair.newPair(results, consumer.getScanMetrics());
130  }
131
132  private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScan(Scan scan)
133    throws Exception {
134    SimpleScanResultConsumerImpl consumer = new SimpleScanResultConsumerImpl();
135    CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer);
136    return Pair.newPair(consumer.getAll(), consumer.getScanMetrics());
137  }
138
139  private static Pair<List<Result>, ScanMetrics> doScanWithAsyncTableScanner(Scan scan)
140    throws IOException {
141    try (ResultScanner scanner =
142      CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).getScanner(scan)) {
143      List<Result> results = new ArrayList<>();
144      for (Result result; (result = scanner.next()) != null;) {
145        results.add(result);
146      }
147      return Pair.newPair(results, scanner.getScanMetrics());
148    }
149  }
150
151  @TestTemplate
152  public void testScanMetricsDisabled() throws Exception {
153    Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
154    assertEquals(3, pair.getFirst().size());
155    // Assert no scan metrics
156    assertNull(pair.getSecond());
157  }
158
159  @TestTemplate
160  public void testScanMetricsWithScanMetricsByRegionDisabled() throws Exception {
161    Scan scan = new Scan();
162    scan.setScanMetricsEnabled(true);
163    long startNanos = System.nanoTime();
164    Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
165    long endNanos = System.nanoTime();
166    List<Result> results = pair.getFirst();
167    assertEquals(3, results.size());
168    long bytes = getBytesOfResults(results);
169    ScanMetrics scanMetrics = pair.getSecond();
170    assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
171    assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
172    assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
173    // Assert scan metrics have not been collected by region
174    assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
175    assertThat(scanMetrics.sumOfMillisSecBetweenNexts.get(),
176      both(greaterThan(0L)).and(lessThan(TimeUnit.NANOSECONDS.toMillis(endNanos - startNanos))));
177  }
178
179  @TestTemplate
180  public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
181    Scan scan = new Scan();
182    scan.withStartRow(Bytes.toBytes("zzz1"), true);
183    scan.withStopRow(Bytes.toBytes("zzz1"), true);
184    scan.setEnableScanMetricsByRegion(true);
185    Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
186    List<Result> results = pair.getFirst();
187    assertEquals(1, results.size());
188    long bytes = getBytesOfResults(results);
189    ScanMetrics scanMetrics = pair.getSecond();
190    assertEquals(1, scanMetrics.countOfRegions.get());
191    assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
192    assertEquals(1, scanMetrics.countOfRPCcalls.get());
193    // Assert scan metrics by region were collected for the region scanned
194    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
195      scanMetrics.collectMetricsByRegion(false);
196    assertEquals(1, scanMetricsByRegion.size());
197    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
198      .entrySet()) {
199      ScanMetricsRegionInfo smri = entry.getKey();
200      Map<String, Long> metrics = entry.getValue();
201      assertNotNull(smri.getServerName());
202      assertNotNull(smri.getEncodedRegionName());
203      // Assert overall scan metrics and scan metrics by region should be equal as only 1 region
204      // was scanned.
205      assertEquals(scanMetrics.getMetricsMap(false), metrics);
206    }
207    // we only have 1 rpc call so there is no millis 'between nexts'
208    assertEquals(0, scanMetrics.sumOfMillisSecBetweenNexts.get());
209  }
210
211  @TestTemplate
212  public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
213    Scan scan = new Scan();
214    scan.setEnableScanMetricsByRegion(true);
215    long startNanos = System.nanoTime();
216    Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
217    long endNanos = System.nanoTime();
218    List<Result> results = pair.getFirst();
219    assertEquals(3, results.size());
220    long bytes = getBytesOfResults(results);
221    ScanMetrics scanMetrics = pair.getSecond();
222    Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false);
223    assertEquals(NUM_REGIONS, (long) overallMetrics.get(REGIONS_SCANNED_METRIC_NAME));
224    assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
225    assertEquals(bytes, (long) overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
226    assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
227    assertEquals(NUM_REGIONS, (long) overallMetrics.get(RPC_CALLS_METRIC_NAME));
228    assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
229    // Assert scan metrics by region were collected for the region scanned
230    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
231      scanMetrics.collectMetricsByRegion(false);
232    assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
233    int rowsScannedAcrossAllRegions = 0;
234    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
235      .entrySet()) {
236      ScanMetricsRegionInfo smri = entry.getKey();
237      Map<String, Long> perRegionMetrics = entry.getValue();
238      assertNotNull(smri.getServerName());
239      assertNotNull(smri.getEncodedRegionName());
240      assertEquals(1, (long) perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME));
241      if (perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
242        bytes = getBytesOfResults(Collections.singletonList(results.get(0)));
243        assertEquals(bytes, (long) perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
244        rowsScannedAcrossAllRegions++;
245      } else {
246        assertEquals(0, (long) perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
247        assertEquals(0, (long) perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
248      }
249    }
250    assertEquals(3, rowsScannedAcrossAllRegions);
251    assertThat(scanMetrics.sumOfMillisSecBetweenNexts.get(),
252      both(greaterThan(0L)).and(lessThan(TimeUnit.NANOSECONDS.toMillis(endNanos - startNanos))));
253  }
254
255  static long getBytesOfResults(List<Result> results) {
256    return results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream())
257      .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum();
258  }
259}