001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.TestAsyncTableScanMetrics.getBytesOfResults;
021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
023import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
024import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
025import static org.junit.Assert.assertEquals;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertTrue;
028
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.List;
033import java.util.Map;
034import java.util.concurrent.atomic.AtomicInteger;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.Waiter;
039import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
040import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.junit.AfterClass;
045import org.junit.Assert;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
052
053@Category({ MediumTests.class, ClientTests.class })
054public class TestAsyncTableScanMetricsWithScannerSuspending {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestAsyncTableScanMetricsWithScannerSuspending.class);
059
060  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
061
062  private static final TableName TABLE_NAME =
063    TableName.valueOf(TestAsyncTableScanMetricsWithScannerSuspending.class.getSimpleName());
064
065  private static final byte[] CF = Bytes.toBytes("cf");
066
067  private static final byte[] CQ = Bytes.toBytes("cq");
068
069  private static final byte[] VALUE = Bytes.toBytes("value");
070
071  private static AsyncConnection CONN;
072
073  @BeforeClass
074  public static void setUp() throws Exception {
075    UTIL.startMiniCluster(1);
076    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that
077    // scan hits all the region and not all rows lie in a single region
078    try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
079      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE),
080        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
081        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
082    }
083    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
084  }
085
086  @AfterClass
087  public static void tearDown() throws Exception {
088    Closeables.close(CONN, true);
089    UTIL.shutdownMiniCluster();
090  }
091
092  @Test
093  public void testScanMetricsByRegionWithScannerSuspending() throws Exception {
094    // Setup scan
095    Scan scan = new Scan();
096    scan.withStartRow(Bytes.toBytes("xxx1"), true);
097    scan.withStopRow(Bytes.toBytes("zzz1"), true);
098    scan.setEnableScanMetricsByRegion(true);
099    scan.setMaxResultSize(1);
100
101    // Prepare scanner
102    final AtomicInteger rowsReadCounter = new AtomicInteger(0);
103    AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, scan, 1) {
104      @Override
105      public void onNext(Result[] results, ScanController controller) {
106        rowsReadCounter.addAndGet(results.length);
107        super.onNext(results, controller);
108      }
109    };
110
111    // Do the scan so that rows get loaded in the scanner (consumer)
112    CONN.getTable(TABLE_NAME).scan(scan, scanner);
113
114    List<Result> results = new ArrayList<>();
115    int expectedTotalRows = 3;
116    // Assert that only 1 row has been loaded so far as maxCacheSize is set to 1 byte
117    for (int i = 1; i <= expectedTotalRows; i++) {
118      UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
119
120        @Override
121        public boolean evaluate() throws Exception {
122          return scanner.isSuspended();
123        }
124
125        @Override
126        public String explainFailure() throws Exception {
127          return "The given scanner has been suspended in time";
128        }
129      });
130      assertTrue(scanner.isSuspended());
131      assertEquals(i, rowsReadCounter.get());
132      results.add(scanner.next());
133    }
134    Assert.assertNull(scanner.next());
135
136    // Assert on overall scan metrics and scan metrics by region
137    ScanMetrics scanMetrics = scanner.getScanMetrics();
138    // Assert on overall scan metrics
139    long bytes = getBytesOfResults(results);
140    Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false);
141    assertEquals(3, (long) overallMetrics.get(REGIONS_SCANNED_METRIC_NAME));
142    assertEquals(bytes, (long) overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
143    // 1 Extra RPC call per region where no row is returned but moreResultsInRegion is set to false
144    assertEquals(6, (long) overallMetrics.get(RPC_CALLS_METRIC_NAME));
145    // Assert scan metrics by region were collected for the region scanned
146    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
147      scanMetrics.collectMetricsByRegion(false);
148    assertEquals(3, scanMetricsByRegion.size());
149    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
150      .entrySet()) {
151      ScanMetricsRegionInfo smri = entry.getKey();
152      Map<String, Long> perRegionMetrics = entry.getValue();
153      assertNotNull(smri.getServerName());
154      assertNotNull(smri.getEncodedRegionName());
155      assertEquals(1, (long) perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME));
156      assertEquals(1, (long) perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
157      bytes = getBytesOfResults(Collections.singletonList(results.get(0)));
158      assertEquals(bytes, (long) perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
159      assertEquals(2, (long) perRegionMetrics.get(RPC_CALLS_METRIC_NAME));
160    }
161  }
162}