001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.TestAsyncTableScanMetrics.getBytesOfResults;
021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
023import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
024import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
025import static org.junit.jupiter.api.Assertions.assertEquals;
026import static org.junit.jupiter.api.Assertions.assertNotNull;
027import static org.junit.jupiter.api.Assertions.assertNull;
028import static org.junit.jupiter.api.Assertions.assertTrue;
029
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.Collections;
033import java.util.List;
034import java.util.Map;
035import java.util.concurrent.atomic.AtomicInteger;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.Waiter;
039import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
040import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.junit.jupiter.api.AfterAll;
045import org.junit.jupiter.api.BeforeAll;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048
049import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
050
051@Tag(MediumTests.TAG)
052@Tag(ClientTests.TAG)
053public class TestAsyncTableScanMetricsWithScannerSuspending {
054
055  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
056
057  private static final TableName TABLE_NAME =
058    TableName.valueOf(TestAsyncTableScanMetricsWithScannerSuspending.class.getSimpleName());
059
060  private static final byte[] CF = Bytes.toBytes("cf");
061
062  private static final byte[] CQ = Bytes.toBytes("cq");
063
064  private static final byte[] VALUE = Bytes.toBytes("value");
065
066  private static AsyncConnection CONN;
067
068  @BeforeAll
069  public static void setUp() throws Exception {
070    UTIL.startMiniCluster(1);
071    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that
072    // scan hits all the region and not all rows lie in a single region
073    try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
074      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE),
075        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
076        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
077    }
078    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
079  }
080
081  @AfterAll
082  public static void tearDown() throws Exception {
083    Closeables.close(CONN, true);
084    UTIL.shutdownMiniCluster();
085  }
086
087  @Test
088  public void testScanMetricsByRegionWithScannerSuspending() throws Exception {
089    // Setup scan
090    Scan scan = new Scan();
091    scan.withStartRow(Bytes.toBytes("xxx1"), true);
092    scan.withStopRow(Bytes.toBytes("zzz1"), true);
093    scan.setEnableScanMetricsByRegion(true);
094    scan.setMaxResultSize(1);
095
096    // Prepare scanner
097    final AtomicInteger rowsReadCounter = new AtomicInteger(0);
098    AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, scan, 1) {
099      @Override
100      public void onNext(Result[] results, ScanController controller) {
101        rowsReadCounter.addAndGet(results.length);
102        super.onNext(results, controller);
103      }
104    };
105
106    // Do the scan so that rows get loaded in the scanner (consumer)
107    CONN.getTable(TABLE_NAME).scan(scan, scanner);
108
109    List<Result> results = new ArrayList<>();
110    int expectedTotalRows = 3;
111    // Assert that only 1 row has been loaded so far as maxCacheSize is set to 1 byte
112    for (int i = 1; i <= expectedTotalRows; i++) {
113      UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
114
115        @Override
116        public boolean evaluate() throws Exception {
117          return scanner.isSuspended();
118        }
119
120        @Override
121        public String explainFailure() throws Exception {
122          return "The given scanner has been suspended in time";
123        }
124      });
125      assertTrue(scanner.isSuspended());
126      assertEquals(i, rowsReadCounter.get());
127      results.add(scanner.next());
128    }
129    assertNull(scanner.next());
130
131    // Assert on overall scan metrics and scan metrics by region
132    ScanMetrics scanMetrics = scanner.getScanMetrics();
133    // Assert on overall scan metrics
134    long bytes = getBytesOfResults(results);
135    Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false);
136    assertEquals(3, (long) overallMetrics.get(REGIONS_SCANNED_METRIC_NAME));
137    assertEquals(bytes, (long) overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
138    // 1 Extra RPC call per region where no row is returned but moreResultsInRegion is set to false
139    assertEquals(6, (long) overallMetrics.get(RPC_CALLS_METRIC_NAME));
140    // Assert scan metrics by region were collected for the region scanned
141    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
142      scanMetrics.collectMetricsByRegion(false);
143    assertEquals(3, scanMetricsByRegion.size());
144    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
145      .entrySet()) {
146      ScanMetricsRegionInfo smri = entry.getKey();
147      Map<String, Long> perRegionMetrics = entry.getValue();
148      assertNotNull(smri.getServerName());
149      assertNotNull(smri.getEncodedRegionName());
150      assertEquals(1, (long) perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME));
151      assertEquals(1, (long) perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
152      bytes = getBytesOfResults(Collections.singletonList(results.get(0)));
153      assertEquals(bytes, (long) perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
154      assertEquals(2, (long) perRegionMetrics.get(RPC_CALLS_METRIC_NAME));
155    }
156  }
157}