001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME;
022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
023import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
024import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
025import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME;
026import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME;
027import static org.junit.jupiter.api.Assertions.assertEquals;
028import static org.junit.jupiter.api.Assertions.assertFalse;
029import static org.junit.jupiter.api.Assertions.assertNotEquals;
030import static org.junit.jupiter.api.Assertions.assertNotNull;
031import static org.junit.jupiter.api.Assertions.assertNull;
032import static org.junit.jupiter.api.Assertions.assertTrue;
033
034import java.io.IOException;
035import java.util.Arrays;
036import java.util.HashMap;
037import java.util.HashSet;
038import java.util.List;
039import java.util.Map;
040import java.util.Random;
041import java.util.Set;
042import java.util.concurrent.CountDownLatch;
043import java.util.concurrent.Executors;
044import java.util.concurrent.ThreadPoolExecutor;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.atomic.AtomicInteger;
047import java.util.concurrent.atomic.AtomicLong;
048import java.util.stream.Stream;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
051import org.apache.hadoop.hbase.HBaseTestingUtil;
052import org.apache.hadoop.hbase.HConstants;
053import org.apache.hadoop.hbase.HRegionLocation;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.hadoop.hbase.TableName;
056import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
057import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
058import org.apache.hadoop.hbase.testclassification.ClientTests;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.FutureUtils;
062import org.junit.jupiter.api.AfterAll;
063import org.junit.jupiter.api.BeforeAll;
064import org.junit.jupiter.api.Tag;
065import org.junit.jupiter.api.TestTemplate;
066import org.junit.jupiter.params.provider.Arguments;
067
068import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
069
070@Tag(ClientTests.TAG)
071@Tag(LargeTests.TAG)
072@HBaseParameterizedTestTemplate(name = "{index}: scanner={0}")
073public class TestTableScanMetrics {
074
075  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
076
077  private static final TableName TABLE_NAME =
078    TableName.valueOf(TestTableScanMetrics.class.getSimpleName());
079
080  private static final byte[] CF = Bytes.toBytes("cf");
081
082  private static final byte[] CQ = Bytes.toBytes("cq");
083
084  private static final byte[] VALUE = Bytes.toBytes("value");
085
086  private static final Random RAND = new Random(11);
087
088  private static int NUM_REGIONS;
089
090  private static Connection CONN;
091
092  public static Stream<Arguments> parameters() {
093    return Stream.of(Arguments.of("ForwardScanner", new Scan()),
094      Arguments.of("ReverseScanner", new Scan().setReversed(true)));
095  }
096
097  private Scan originalScan;
098
099  public TestTableScanMetrics(String scannerName, Scan originalScan) {
100    this.originalScan = originalScan;
101  }
102
103  @BeforeAll
104  public static void setUp() throws Exception {
105    // Start the minicluster
106    TEST_UTIL.startMiniCluster(2);
107    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that
108    // scan hits all the region and not all rows lie in a single region
109    try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
110      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE),
111        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
112        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
113    }
114    CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
115    NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
116  }
117
118  @AfterAll
119  public static void tearDown() throws Exception {
120    Closeables.close(CONN, true);
121    TEST_UTIL.shutdownMiniCluster();
122  }
123
124  private Scan generateScan(byte[] smallerRow, byte[] largerRow) throws IOException {
125    Scan scan = new Scan(originalScan);
126    if (originalScan.isReversed()) {
127      scan.withStartRow(largerRow, true);
128      scan.withStopRow(smallerRow, true);
129    } else {
130      scan.withStartRow(smallerRow, true);
131      scan.withStopRow(largerRow, true);
132    }
133    return scan;
134  }
135
136  private ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int expectedCount)
137    throws IOException {
138    int countOfRows = 0;
139    ScanMetrics scanMetrics;
140    try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = table.getScanner(scan)) {
141      for (Result result : scanner) {
142        assertFalse(result.isEmpty());
143        countOfRows++;
144      }
145      scanMetrics = scanner.getScanMetrics();
146    }
147    assertEquals(expectedCount, countOfRows);
148    return scanMetrics;
149  }
150
151  @TestTemplate
152  public void testScanMetricsDisabled() throws Exception {
153    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
154    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3);
155    assertNull(scanMetrics);
156  }
157
158  @TestTemplate
159  public void testScanMetricsWithScanMetricByRegionDisabled() throws Exception {
160    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
161    scan.setScanMetricsEnabled(true);
162    int expectedRowsScanned = 3;
163    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
164    assertNotNull(scanMetrics);
165    Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
166    // The test setup is such that we have 1 row per region in the scan range
167    assertEquals(expectedRowsScanned, scanMetrics.countOfRegions.get());
168    assertEquals(expectedRowsScanned, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
169    assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
170  }
171
172  @TestTemplate
173  public void testScanMetricsResetWithScanMetricsByRegionDisabled() throws Exception {
174    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
175    scan.setScanMetricsEnabled(true);
176    int expectedRowsScanned = 3;
177    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
178    assertNotNull(scanMetrics);
179    // By default counters are collected with reset as true
180    Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
181    assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
182    assertEquals(expectedRowsScanned, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
183    // Subsequent call to get scan metrics map should show all counters as 0
184    assertEquals(0, scanMetrics.countOfRegions.get());
185    assertEquals(0, scanMetrics.countOfRowsScanned.get());
186  }
187
188  @TestTemplate
189  public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
190    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("xxx1"));
191    scan.setEnableScanMetricsByRegion(true);
192    int expectedRowsScanned = 1;
193    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
194    assertNotNull(scanMetrics);
195    Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
196    assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
197    assertEquals(expectedRowsScanned, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
198    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
199      scanMetrics.collectMetricsByRegion(false);
200    assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
201    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
202      .entrySet()) {
203      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
204      metricsMap = entry.getValue();
205      assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
206      assertNotNull(scanMetricsRegionInfo.getServerName());
207      // As we are scanning single row so, overall scan metrics will match per region scan metrics
208      assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
209      assertEquals(expectedRowsScanned,
210        (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
211    }
212  }
213
214  @TestTemplate
215  public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
216    Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
217    scan.setEnableScanMetricsByRegion(true);
218    int expectedRowsScanned = 3;
219    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
220    assertNotNull(scanMetrics);
221    assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
222    assertEquals(expectedRowsScanned, scanMetrics.countOfRowsScanned.get());
223    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
224      scanMetrics.collectMetricsByRegion(false);
225    assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
226    int rowsScannedAcrossAllRegions = 0;
227    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
228      .entrySet()) {
229      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
230      Map<String, Long> metricsMap = entry.getValue();
231      assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
232      assertNotNull(scanMetricsRegionInfo.getServerName());
233      assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
234      if (metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
235        rowsScannedAcrossAllRegions++;
236      } else {
237        assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
238      }
239    }
240    assertEquals(expectedRowsScanned, rowsScannedAcrossAllRegions);
241  }
242
243  @TestTemplate
244  public void testScanMetricsByRegionReset() throws Exception {
245    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
246    scan.setEnableScanMetricsByRegion(true);
247    int expectedRowsScanned = 3;
248    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
249    assertNotNull(scanMetrics);
250
251    // Retrieve scan metrics by region as a map and reset
252    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
253      scanMetrics.collectMetricsByRegion();
254    // We scan 1 row per region
255    assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
256    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
257      .entrySet()) {
258      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
259      Map<String, Long> metricsMap = entry.getValue();
260      assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
261      assertNotNull(scanMetricsRegionInfo.getServerName());
262      assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
263      assertEquals(1, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
264    }
265
266    // Scan metrics have already been reset and now all counters should be 0
267    scanMetricsByRegion = scanMetrics.collectMetricsByRegion(false);
268    // Size of map should be same as earlier
269    assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
270    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
271      .entrySet()) {
272      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
273      Map<String, Long> metricsMap = entry.getValue();
274      assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
275      assertNotNull(scanMetricsRegionInfo.getServerName());
276      // Counters should have been reset to 0
277      assertEquals(0, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
278      assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
279    }
280  }
281
282  @TestTemplate
283  public void testConcurrentUpdatesAndResetOfScanMetricsByRegion() throws Exception {
284    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
285    TableName tableName = TableName.valueOf(TestTableScanMetrics.class.getSimpleName()
286      + "_testConcurrentUpdatesAndResetToScanMetricsByRegion");
287    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
288      TEST_UTIL.loadTable(table, CF);
289
290      Map<ScanMetricsRegionInfo, Map<String, Long>> concurrentScanMetricsByRegion = new HashMap<>();
291
292      // Trigger two concurrent threads one of which scans the table and other periodically
293      // collects the scan metrics (along with resetting the counters to 0).
294      Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
295      scan.setEnableScanMetricsByRegion(true);
296      scan.setCaching(2);
297      try (ResultScanner rs = table.getScanner(scan)) {
298        ScanMetrics scanMetrics = rs.getScanMetrics();
299        AtomicInteger rowsScanned = new AtomicInteger(0);
300        CountDownLatch latch = new CountDownLatch(1);
301        Runnable tableScanner = new Runnable() {
302          public void run() {
303            for (Result r : rs) {
304              assertFalse(r.isEmpty());
305              rowsScanned.incrementAndGet();
306            }
307            latch.countDown();
308          }
309        };
310        Runnable metricsCollector =
311          getPeriodicScanMetricsCollector(scanMetrics, concurrentScanMetricsByRegion, latch);
312        executor.execute(tableScanner);
313        executor.execute(metricsCollector);
314        latch.await();
315        // Merge leftover scan metrics
316        mergeScanMetricsByRegion(scanMetrics.collectMetricsByRegion(),
317          concurrentScanMetricsByRegion);
318        assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned.get());
319      }
320
321      Map<ScanMetricsRegionInfo, Map<String, Long>> expectedScanMetricsByRegion;
322
323      // Collect scan metrics by region from single thread. Assert that concurrent scan
324      // and metrics collection works as expected.
325      scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
326      scan.setEnableScanMetricsByRegion(true);
327      scan.setCaching(2);
328      try (ResultScanner rs = table.getScanner(scan)) {
329        ScanMetrics scanMetrics = rs.getScanMetrics();
330        int rowsScanned = 0;
331        for (Result r : rs) {
332          assertFalse(r.isEmpty());
333          rowsScanned++;
334        }
335        assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned);
336        expectedScanMetricsByRegion = scanMetrics.collectMetricsByRegion();
337        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : expectedScanMetricsByRegion
338          .entrySet()) {
339          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
340          Map<String, Long> metricsMap = entry.getValue();
341          // Remove millis between nexts metric as it is not deterministic
342          metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
343          metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
344          metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
345          assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
346          assertNotNull(scanMetricsRegionInfo.getServerName());
347          assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
348          // Each region will have 26 * 26 + 26 + 1 rows except last region which will have 1 row
349          long rowsScannedFromMetrics = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
350          assertTrue(rowsScannedFromMetrics == 1 || rowsScannedFromMetrics == (26 * 26 + 26 + 1));
351        }
352      }
353
354      // Assert on scan metrics by region
355      assertEquals(expectedScanMetricsByRegion, concurrentScanMetricsByRegion);
356    } finally {
357      TEST_UTIL.deleteTable(tableName);
358    }
359  }
360
361  @TestTemplate
362  public void testRPCCallProcessingAndQueueWaitTimeMetrics() throws Exception {
363    final int numThreads = 20;
364    Configuration conf = TEST_UTIL.getConfiguration();
365    // Handler count is 3 by default.
366    int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
367      HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
368    // Keep the number of threads to be high enough for RPC calls to queue up. For now going with 6
369    // times the handler count.
370    assertTrue(numThreads > 6 * handlerCount);
371    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
372    TableName tableName = TableName.valueOf(
373      TestTableScanMetrics.class.getSimpleName() + "_testRPCCallProcessingAndQueueWaitTimeMetrics");
374    AtomicLong totalScanRpcTime = new AtomicLong(0);
375    AtomicLong totalQueueWaitTime = new AtomicLong(0);
376    CountDownLatch latch = new CountDownLatch(numThreads);
377    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
378      TEST_UTIL.loadTable(table, CF);
379      for (int i = 0; i < numThreads; i++) {
380        executor.execute(new Runnable() {
381          @Override
382          public void run() {
383            try {
384              Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
385              scan.setEnableScanMetricsByRegion(true);
386              scan.setCaching(2);
387              try (ResultScanner rs = table.getScanner(scan)) {
388                Result r;
389                while ((r = rs.next()) != null) {
390                  assertFalse(r.isEmpty());
391                }
392                ScanMetrics scanMetrics = rs.getScanMetrics();
393                Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
394                totalScanRpcTime.addAndGet(metricsMap.get(RPC_SCAN_PROCESSING_TIME_METRIC_NAME));
395                totalQueueWaitTime.addAndGet(metricsMap.get(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME));
396              }
397              latch.countDown();
398            } catch (IOException e) {
399              throw new RuntimeException(e);
400            }
401          }
402        });
403      }
404      latch.await();
405      executor.shutdown();
406      executor.awaitTermination(10, TimeUnit.SECONDS);
407      assertTrue(totalScanRpcTime.get() > 0);
408      assertTrue(totalQueueWaitTime.get() > 0);
409    } finally {
410      TEST_UTIL.deleteTable(tableName);
411    }
412  }
413
414  @TestTemplate
415  public void testScanMetricsByRegionWithRegionMove() throws Exception {
416    TableName tableName = TableName.valueOf(
417      TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMove");
418    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
419      TEST_UTIL.loadTable(table, CF);
420
421      // Scan 2 regions with start row keys: bbb and ccc
422      byte[] bbb = Bytes.toBytes("bbb");
423      byte[] ccc = Bytes.toBytes("ccc");
424      byte[] ddc = Bytes.toBytes("ddc");
425      long expectedCountOfRowsScannedInMovedRegion = 0;
426      // ROWS is the data loaded by loadTable()
427      for (byte[] row : HBaseTestingUtil.ROWS) {
428        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccc) < 0) {
429          expectedCountOfRowsScannedInMovedRegion++;
430        }
431      }
432      byte[] movedRegion = null;
433      ScanMetrics scanMetrics;
434
435      // Initialize scan with maxResultSize as size of 50 rows.
436      Scan scan = generateScan(bbb, ddc);
437      scan.setEnableScanMetricsByRegion(true);
438      scan.setMaxResultSize(8000);
439
440      try (ResultScanner rs = table.getScanner(scan)) {
441        boolean isFirstScanOfRegion = true;
442        for (Result r : rs) {
443          byte[] row = r.getRow();
444          if (isFirstScanOfRegion) {
445            movedRegion = moveRegion(tableName, row);
446            isFirstScanOfRegion = false;
447          }
448        }
449        assertNotNull(movedRegion);
450
451        scanMetrics = rs.getScanMetrics();
452        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
453          scanMetrics.collectMetricsByRegion();
454        long actualCountOfRowsScannedInMovedRegion = 0;
455        Set<ServerName> serversForMovedRegion = new HashSet<>();
456
457        // 2 regions scanned with two entries for first region as it moved in b/w scan
458        assertEquals(3, scanMetricsByRegion.size());
459        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
460          .entrySet()) {
461          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
462          Map<String, Long> metricsMap = entry.getValue();
463          if (scanMetricsRegionInfo.getEncodedRegionName().equals(Bytes.toString(movedRegion))) {
464            long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
465            actualCountOfRowsScannedInMovedRegion += rowsScanned;
466            serversForMovedRegion.add(scanMetricsRegionInfo.getServerName());
467
468            assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME));
469          }
470          assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
471        }
472        assertEquals(expectedCountOfRowsScannedInMovedRegion,
473          actualCountOfRowsScannedInMovedRegion);
474        assertEquals(2, serversForMovedRegion.size());
475      }
476    } finally {
477      TEST_UTIL.deleteTable(tableName);
478    }
479  }
480
481  @TestTemplate
482  public void testScanMetricsByRegionWithRegionSplit() throws Exception {
483    TableName tableName = TableName.valueOf(
484      TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionSplit");
485    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
486      TEST_UTIL.loadTable(table, CF);
487
488      // Scan 1 region with start row key: bbb
489      byte[] bbb = Bytes.toBytes("bbb");
490      byte[] bmw = Bytes.toBytes("bmw");
491      byte[] ccb = Bytes.toBytes("ccb");
492      long expectedCountOfRowsScannedInRegion = 0;
493      // ROWS is the data loaded by loadTable()
494      for (byte[] row : HBaseTestingUtil.ROWS) {
495        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccb) <= 0) {
496          expectedCountOfRowsScannedInRegion++;
497        }
498      }
499      ScanMetrics scanMetrics;
500      Set<String> expectedSplitRegionRes = new HashSet<>();
501
502      // Initialize scan
503      Scan scan = generateScan(bbb, ccb);
504      scan.setEnableScanMetricsByRegion(true);
505      scan.setMaxResultSize(8000);
506
507      try (ResultScanner rs = table.getScanner(scan)) {
508        boolean isFirstScanOfRegion = true;
509        while (rs.next() != null) {
510          if (isFirstScanOfRegion) {
511            splitRegion(tableName, bbb, bmw)
512              .forEach(region -> expectedSplitRegionRes.add(Bytes.toString(region)));
513            isFirstScanOfRegion = false;
514          }
515        }
516
517        scanMetrics = rs.getScanMetrics();
518        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
519          scanMetrics.collectMetricsByRegion();
520
521        long actualCountOfRowsScannedInRegion = 0;
522        long rpcRetiesCount = 0;
523        Set<String> splitRegionRes = new HashSet<>();
524
525        // 1 entry each for parent and two child regions
526        assertEquals(3, scanMetricsByRegion.size());
527        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
528          .entrySet()) {
529          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
530          Map<String, Long> metricsMap = entry.getValue();
531          long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
532          actualCountOfRowsScannedInRegion += rowsScanned;
533          splitRegionRes.add(scanMetricsRegionInfo.getEncodedRegionName());
534
535          if (metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1) {
536            rpcRetiesCount++;
537          }
538
539          assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
540        }
541        assertEquals(expectedCountOfRowsScannedInRegion, actualCountOfRowsScannedInRegion);
542        assertEquals(2, rpcRetiesCount);
543        assertEquals(expectedSplitRegionRes, splitRegionRes);
544      }
545    } finally {
546      TEST_UTIL.deleteTable(tableName);
547    }
548  }
549
550  @TestTemplate
551  public void testScanMetricsByRegionWithRegionMerge() throws Exception {
552    TableName tableName = TableName.valueOf(
553      TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMerge");
554    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
555      TEST_UTIL.loadTable(table, CF);
556
557      // Scan 2 regions with start row keys: bbb and ccc
558      byte[] bbb = Bytes.toBytes("bbb");
559      byte[] ccc = Bytes.toBytes("ccc");
560      byte[] ddc = Bytes.toBytes("ddc");
561      long expectedCountOfRowsScannedInRegions = 0;
562      // ROWS is the data loaded by loadTable()
563      for (byte[] row : HBaseTestingUtil.ROWS) {
564        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ddc) <= 0) {
565          expectedCountOfRowsScannedInRegions++;
566        }
567      }
568      ScanMetrics scanMetrics;
569      Set<String> expectedMergeRegionsRes = new HashSet<>();
570      String mergedRegionEncodedName = null;
571
572      // Initialize scan
573      Scan scan = generateScan(bbb, ddc);
574      scan.setEnableScanMetricsByRegion(true);
575      scan.setMaxResultSize(8000);
576
577      try (ResultScanner rs = table.getScanner(scan)) {
578        boolean isFirstScanOfRegion = true;
579        while (rs.next() != null) {
580          if (isFirstScanOfRegion) {
581            List<byte[]> out = mergeRegions(tableName, bbb, ccc);
582            // Entry with index 2 is the encoded region name of merged region
583            mergedRegionEncodedName = Bytes.toString(out.get(2));
584            out.forEach(region -> expectedMergeRegionsRes.add(Bytes.toString(region)));
585            isFirstScanOfRegion = false;
586          }
587        }
588
589        scanMetrics = rs.getScanMetrics();
590        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
591          scanMetrics.collectMetricsByRegion();
592        long actualCountOfRowsScannedInRegions = 0;
593        Set<String> mergeRegionsRes = new HashSet<>();
594        boolean containsMergedRegionInScanMetrics = false;
595
596        // 1 entry each for old region from which first row was scanned and new merged region
597        assertEquals(2, scanMetricsByRegion.size());
598        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
599          .entrySet()) {
600          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
601          Map<String, Long> metricsMap = entry.getValue();
602          long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
603          actualCountOfRowsScannedInRegions += rowsScanned;
604          mergeRegionsRes.add(scanMetricsRegionInfo.getEncodedRegionName());
605          if (scanMetricsRegionInfo.getEncodedRegionName().equals(mergedRegionEncodedName)) {
606            containsMergedRegionInScanMetrics = true;
607          }
608
609          assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME));
610          assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
611        }
612        assertEquals(expectedCountOfRowsScannedInRegions, actualCountOfRowsScannedInRegions);
613        assertTrue(expectedMergeRegionsRes.containsAll(mergeRegionsRes));
614        assertTrue(containsMergedRegionInScanMetrics);
615      }
616    } finally {
617      TEST_UTIL.deleteTable(tableName);
618    }
619  }
620
621  private Runnable getPeriodicScanMetricsCollector(ScanMetrics scanMetrics,
622    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegionCollection,
623    CountDownLatch latch) {
624    return new Runnable() {
625      public void run() {
626        try {
627          while (latch.getCount() > 0) {
628            Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
629              scanMetrics.collectMetricsByRegion();
630            mergeScanMetricsByRegion(scanMetricsByRegion, scanMetricsByRegionCollection);
631            Thread.sleep(RAND.nextInt(10));
632          }
633        } catch (InterruptedException e) {
634          throw new RuntimeException(e);
635        }
636      }
637    };
638  }
639
640  private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String, Long>> srcMap,
641    Map<ScanMetricsRegionInfo, Map<String, Long>> dstMap) {
642    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : srcMap.entrySet()) {
643      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
644      Map<String, Long> metricsMap = entry.getValue();
645      // Remove millis between nexts metric as it is not deterministic
646      metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
647      metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
648      metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
649      if (dstMap.containsKey(scanMetricsRegionInfo)) {
650        Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo);
651        for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) {
652          String metricName = metricEntry.getKey();
653          Long existingValue = dstMetricsMap.get(metricName);
654          Long newValue = metricEntry.getValue();
655          dstMetricsMap.put(metricName, existingValue + newValue);
656        }
657      } else {
658        dstMap.put(scanMetricsRegionInfo, metricsMap);
659      }
660    }
661  }
662
663  /**
664   * Moves the region with start row key from its original region server to some other region
665   * server. This is a synchronous method.
666   * @param tableName Table name of region to be moved belongs.
667   * @param startRow  Start row key of the region to be moved.
668   * @return Encoded region name of the region which was moved.
669   */
670  private byte[] moveRegion(TableName tableName, byte[] startRow) throws IOException {
671    Admin admin = TEST_UTIL.getAdmin();
672    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
673    HRegionLocation loc = regionLocator.getRegionLocation(startRow, true);
674    byte[] encodedRegionName = loc.getRegion().getEncodedNameAsBytes();
675    ServerName initialServerName = loc.getServerName();
676
677    admin.move(encodedRegionName);
678
679    ServerName finalServerName = regionLocator.getRegionLocation(startRow, true).getServerName();
680
681    // Assert that region actually moved
682    assertNotEquals(initialServerName, finalServerName);
683    return encodedRegionName;
684  }
685
686  /**
687   * Splits the region with start row key at the split key provided. This is a synchronous method.
688   * @param tableName Table name of region to be split.
689   * @param startRow  Start row key of the region to be split.
690   * @param splitKey  Split key for splitting the region.
691   * @return List of encoded region names with first element being parent region followed by two
692   *         child regions.
693   */
694  private List<byte[]> splitRegion(TableName tableName, byte[] startRow, byte[] splitKey)
695    throws IOException {
696    Admin admin = TEST_UTIL.getAdmin();
697    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
698    HRegionLocation topLoc = regionLocator.getRegionLocation(startRow, true);
699    byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
700    ServerName initialTopServerName = topLoc.getServerName();
701    HRegionLocation bottomLoc = regionLocator.getRegionLocation(splitKey, true);
702    byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
703    ServerName initialBottomServerName = bottomLoc.getServerName();
704
705    // Assert region is ready for split
706    assertEquals(initialTopServerName, initialBottomServerName);
707    assertEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName);
708
709    FutureUtils.get(admin.splitRegionAsync(initialEncodedTopRegionName, splitKey));
710
711    topLoc = regionLocator.getRegionLocation(startRow, true);
712    byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
713    bottomLoc = regionLocator.getRegionLocation(splitKey, true);
714    byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
715
716    // Assert that region split is complete
717    assertNotEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName);
718    assertNotEquals(initialEncodedTopRegionName, finalEncodedBottomRegionName);
719    assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName);
720
721    return Arrays.asList(initialEncodedTopRegionName, finalEncodedTopRegionName,
722      finalEncodedBottomRegionName);
723  }
724
725  /**
726   * Merges two regions with the start row key as topRegion and bottomRegion. Ensures that the
727   * regions to be merged are adjacent regions. This is a synchronous method.
728   * @param tableName    Table name of regions to be merged.
729   * @param topRegion    Start row key of first region for merging.
730   * @param bottomRegion Start row key of second region for merging.
731   * @return List of encoded region names with first two elements being original regions followed by
732   *         the merged region.
733   */
734  private List<byte[]> mergeRegions(TableName tableName, byte[] topRegion, byte[] bottomRegion)
735    throws IOException {
736    Admin admin = TEST_UTIL.getAdmin();
737    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
738    HRegionLocation topLoc = regionLocator.getRegionLocation(topRegion, true);
739    byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
740    String initialTopRegionEndKey = Bytes.toString(topLoc.getRegion().getEndKey());
741    HRegionLocation bottomLoc = regionLocator.getRegionLocation(bottomRegion, true);
742    byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
743    String initialBottomRegionStartKey = Bytes.toString(bottomLoc.getRegion().getStartKey());
744
745    // Assert that regions are ready to be merged
746    assertNotEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName);
747    assertEquals(initialBottomRegionStartKey, initialTopRegionEndKey);
748
749    FutureUtils.get(admin.mergeRegionsAsync(
750      new byte[][] { initialEncodedTopRegionName, initialEncodedBottomRegionName }, false));
751
752    topLoc = regionLocator.getRegionLocation(topRegion, true);
753    byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
754    bottomLoc = regionLocator.getRegionLocation(bottomRegion, true);
755    byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
756
757    // Assert regions have been merges successfully
758    assertEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName);
759    assertNotEquals(initialEncodedTopRegionName, finalEncodedTopRegionName);
760    assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName);
761
762    return Arrays.asList(initialEncodedTopRegionName, initialEncodedBottomRegionName,
763      finalEncodedTopRegionName);
764  }
765}