001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
023import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
024import static org.junit.Assert.assertEquals;
025
026import java.io.IOException;
027import java.util.Arrays;
028import java.util.HashMap;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Random;
033import java.util.Set;
034import java.util.concurrent.CountDownLatch;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.atomic.AtomicInteger;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HRegionLocation;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
044import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
045import org.apache.hadoop.hbase.testclassification.ClientTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.FutureUtils;
049import org.junit.AfterClass;
050import org.junit.Assert;
051import org.junit.BeforeClass;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.runners.Parameterized.Parameter;
056import org.junit.runners.Parameterized.Parameters;
057
058@Category({ ClientTests.class, MediumTests.class })
059public class TestTableScanMetrics extends FromClientSideBase {
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestTableScanMetrics.class);
063
064  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
065
066  private static final TableName TABLE_NAME =
067    TableName.valueOf(TestTableScanMetrics.class.getSimpleName());
068
069  private static final byte[] CF = Bytes.toBytes("cf");
070
071  private static final byte[] CQ = Bytes.toBytes("cq");
072
073  private static final byte[] VALUE = Bytes.toBytes("value");
074
075  private static final Random RAND = new Random(11);
076
077  private static int NUM_REGIONS;
078
079  private static Connection CONN;
080
081  @Parameters(name = "{index}: scanner={0}")
082  public static List<Object[]> params() {
083    return Arrays.asList(new Object[] { "ForwardScanner", new Scan() },
084      new Object[] { "ReverseScanner", new Scan().setReversed(true) });
085  }
086
087  @Parameter(0)
088  public String scannerName;
089
090  @Parameter(1)
091  public Scan originalScan;
092
093  @BeforeClass
094  public static void setUp() throws Exception {
095    // Start the minicluster
096    TEST_UTIL.startMiniCluster(2);
097    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that
098    // scan hits all the region and not all rows lie in a single region
099    try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
100      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE),
101        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
102        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
103    }
104    CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
105    NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
106  }
107
108  @AfterClass
109  public static void tearDown() throws Exception {
110    TEST_UTIL.shutdownMiniCluster();
111  }
112
113  private Scan generateScan(byte[] smallerRow, byte[] largerRow) throws IOException {
114    Scan scan = new Scan(originalScan);
115    if (originalScan.isReversed()) {
116      scan.withStartRow(largerRow, true);
117      scan.withStopRow(smallerRow, true);
118    } else {
119      scan.withStartRow(smallerRow, true);
120      scan.withStopRow(largerRow, true);
121    }
122    return scan;
123  }
124
125  private ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int expectedCount)
126    throws IOException {
127    int countOfRows = 0;
128    ScanMetrics scanMetrics;
129    try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = table.getScanner(scan)) {
130      for (Result result : scanner) {
131        Assert.assertFalse(result.isEmpty());
132        countOfRows++;
133      }
134      scanMetrics = scanner.getScanMetrics();
135    }
136    Assert.assertEquals(expectedCount, countOfRows);
137    return scanMetrics;
138  }
139
140  @Test
141  public void testScanMetricsDisabled() throws Exception {
142    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
143    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3);
144    Assert.assertNull(scanMetrics);
145  }
146
147  @Test
148  public void testScanMetricsWithScanMetricByRegionDisabled() throws Exception {
149    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
150    scan.setScanMetricsEnabled(true);
151    int expectedRowsScanned = 3;
152    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
153    Assert.assertNotNull(scanMetrics);
154    Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
155    // The test setup is such that we have 1 row per region in the scan range
156    Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRegions.get());
157    Assert.assertEquals(expectedRowsScanned,
158      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
159    Assert.assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
160  }
161
162  @Test
163  public void testScanMetricsResetWithScanMetricsByRegionDisabled() throws Exception {
164    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
165    scan.setScanMetricsEnabled(true);
166    int expectedRowsScanned = 3;
167    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
168    Assert.assertNotNull(scanMetrics);
169    // By default counters are collected with reset as true
170    Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
171    Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
172    Assert.assertEquals(expectedRowsScanned,
173      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
174    // Subsequent call to get scan metrics map should show all counters as 0
175    Assert.assertEquals(0, scanMetrics.countOfRegions.get());
176    Assert.assertEquals(0, scanMetrics.countOfRowsScanned.get());
177  }
178
179  @Test
180  public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
181    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("xxx1"));
182    scan.setEnableScanMetricsByRegion(true);
183    int expectedRowsScanned = 1;
184    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
185    Assert.assertNotNull(scanMetrics);
186    Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
187    Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
188    Assert.assertEquals(expectedRowsScanned,
189      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
190    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
191      scanMetrics.collectMetricsByRegion(false);
192    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
193    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
194      .entrySet()) {
195      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
196      metricsMap = entry.getValue();
197      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
198      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
199      // As we are scanning single row so, overall scan metrics will match per region scan metrics
200      Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
201      Assert.assertEquals(expectedRowsScanned,
202        (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
203    }
204  }
205
206  @Test
207  public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
208    Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
209    scan.setEnableScanMetricsByRegion(true);
210    int expectedRowsScanned = 3;
211    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
212    Assert.assertNotNull(scanMetrics);
213    Assert.assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
214    Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRowsScanned.get());
215    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
216      scanMetrics.collectMetricsByRegion(false);
217    Assert.assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
218    int rowsScannedAcrossAllRegions = 0;
219    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
220      .entrySet()) {
221      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
222      Map<String, Long> metricsMap = entry.getValue();
223      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
224      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
225      Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
226      if (metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
227        rowsScannedAcrossAllRegions++;
228      } else {
229        assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
230      }
231    }
232    Assert.assertEquals(expectedRowsScanned, rowsScannedAcrossAllRegions);
233  }
234
235  @Test
236  public void testScanMetricsByRegionReset() throws Exception {
237    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
238    scan.setEnableScanMetricsByRegion(true);
239    int expectedRowsScanned = 3;
240    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
241    Assert.assertNotNull(scanMetrics);
242
243    // Retrieve scan metrics by region as a map and reset
244    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
245      scanMetrics.collectMetricsByRegion();
246    // We scan 1 row per region
247    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
248    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
249      .entrySet()) {
250      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
251      Map<String, Long> metricsMap = entry.getValue();
252      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
253      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
254      Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
255      Assert.assertEquals(1, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
256    }
257
258    // Scan metrics have already been reset and now all counters should be 0
259    scanMetricsByRegion = scanMetrics.collectMetricsByRegion(false);
260    // Size of map should be same as earlier
261    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
262    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
263      .entrySet()) {
264      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
265      Map<String, Long> metricsMap = entry.getValue();
266      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
267      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
268      // Counters should have been reset to 0
269      Assert.assertEquals(0, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
270      Assert.assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
271    }
272  }
273
274  @Test
275  public void testConcurrentUpdatesAndResetOfScanMetricsByRegion() throws Exception {
276    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
277    TableName tableName = TableName.valueOf(TestTableScanMetrics.class.getSimpleName()
278      + "_testConcurrentUpdatesAndResetToScanMetricsByRegion");
279    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
280      TEST_UTIL.loadTable(table, CF);
281
282      Map<ScanMetricsRegionInfo, Map<String, Long>> concurrentScanMetricsByRegion = new HashMap<>();
283
284      // Trigger two concurrent threads one of which scans the table and other periodically
285      // collects the scan metrics (along with resetting the counters to 0).
286      Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
287      scan.setEnableScanMetricsByRegion(true);
288      scan.setCaching(2);
289      try (ResultScanner rs = table.getScanner(scan)) {
290        ScanMetrics scanMetrics = rs.getScanMetrics();
291        AtomicInteger rowsScanned = new AtomicInteger(0);
292        CountDownLatch latch = new CountDownLatch(1);
293        Runnable tableScanner = new Runnable() {
294          public void run() {
295            for (Result r : rs) {
296              Assert.assertFalse(r.isEmpty());
297              rowsScanned.incrementAndGet();
298            }
299            latch.countDown();
300          }
301        };
302        Runnable metricsCollector =
303          getPeriodicScanMetricsCollector(scanMetrics, concurrentScanMetricsByRegion, latch);
304        executor.execute(tableScanner);
305        executor.execute(metricsCollector);
306        latch.await();
307        // Merge leftover scan metrics
308        mergeScanMetricsByRegion(scanMetrics.collectMetricsByRegion(),
309          concurrentScanMetricsByRegion);
310        Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned.get());
311      }
312
313      Map<ScanMetricsRegionInfo, Map<String, Long>> expectedScanMetricsByRegion;
314
315      // Collect scan metrics by region from single thread. Assert that concurrent scan
316      // and metrics collection works as expected.
317      scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
318      scan.setEnableScanMetricsByRegion(true);
319      scan.setCaching(2);
320      try (ResultScanner rs = table.getScanner(scan)) {
321        ScanMetrics scanMetrics = rs.getScanMetrics();
322        int rowsScanned = 0;
323        for (Result r : rs) {
324          Assert.assertFalse(r.isEmpty());
325          rowsScanned++;
326        }
327        Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned);
328        expectedScanMetricsByRegion = scanMetrics.collectMetricsByRegion();
329        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : expectedScanMetricsByRegion
330          .entrySet()) {
331          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
332          Map<String, Long> metricsMap = entry.getValue();
333          Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
334          Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
335          Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
336          // Each region will have 26 * 26 + 26 + 1 rows except last region which will have 1 row
337          long rowsScannedFromMetrics = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
338          Assert.assertTrue(
339            rowsScannedFromMetrics == 1 || rowsScannedFromMetrics == (26 * 26 + 26 + 1));
340        }
341      }
342
343      // Assert on scan metrics by region
344      Assert.assertEquals(expectedScanMetricsByRegion, concurrentScanMetricsByRegion);
345    } finally {
346      TEST_UTIL.deleteTable(tableName);
347    }
348  }
349
350  @Test
351  public void testScanMetricsByRegionWithRegionMove() throws Exception {
352    TableName tableName = TableName.valueOf(
353      TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMove");
354    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
355      TEST_UTIL.loadTable(table, CF);
356
357      // Scan 2 regions with start row keys: bbb and ccc
358      byte[] bbb = Bytes.toBytes("bbb");
359      byte[] ccc = Bytes.toBytes("ccc");
360      byte[] ddc = Bytes.toBytes("ddc");
361      long expectedCountOfRowsScannedInMovedRegion = 0;
362      // ROWS is the data loaded by loadTable()
363      for (byte[] row : HBaseTestingUtil.ROWS) {
364        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccc) < 0) {
365          expectedCountOfRowsScannedInMovedRegion++;
366        }
367      }
368      byte[] movedRegion = null;
369      ScanMetrics scanMetrics;
370
371      // Initialize scan with maxResultSize as size of 50 rows.
372      Scan scan = generateScan(bbb, ddc);
373      scan.setEnableScanMetricsByRegion(true);
374      scan.setMaxResultSize(8000);
375
376      try (ResultScanner rs = table.getScanner(scan)) {
377        boolean isFirstScanOfRegion = true;
378        for (Result r : rs) {
379          byte[] row = r.getRow();
380          if (isFirstScanOfRegion) {
381            movedRegion = moveRegion(tableName, row);
382            isFirstScanOfRegion = false;
383          }
384        }
385        Assert.assertNotNull(movedRegion);
386
387        scanMetrics = rs.getScanMetrics();
388        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
389          scanMetrics.collectMetricsByRegion();
390        long actualCountOfRowsScannedInMovedRegion = 0;
391        Set<ServerName> serversForMovedRegion = new HashSet<>();
392
393        // 2 regions scanned with two entries for first region as it moved in b/w scan
394        Assert.assertEquals(3, scanMetricsByRegion.size());
395        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
396          .entrySet()) {
397          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
398          Map<String, Long> metricsMap = entry.getValue();
399          if (scanMetricsRegionInfo.getEncodedRegionName().equals(Bytes.toString(movedRegion))) {
400            long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
401            actualCountOfRowsScannedInMovedRegion += rowsScanned;
402            serversForMovedRegion.add(scanMetricsRegionInfo.getServerName());
403
404            Assert.assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME));
405          }
406          Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
407        }
408        Assert.assertEquals(expectedCountOfRowsScannedInMovedRegion,
409          actualCountOfRowsScannedInMovedRegion);
410        Assert.assertEquals(2, serversForMovedRegion.size());
411      }
412    } finally {
413      TEST_UTIL.deleteTable(tableName);
414    }
415  }
416
417  @Test
418  public void testScanMetricsByRegionWithRegionSplit() throws Exception {
419    TableName tableName = TableName.valueOf(
420      TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionSplit");
421    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
422      TEST_UTIL.loadTable(table, CF);
423
424      // Scan 1 region with start row key: bbb
425      byte[] bbb = Bytes.toBytes("bbb");
426      byte[] bmw = Bytes.toBytes("bmw");
427      byte[] ccb = Bytes.toBytes("ccb");
428      long expectedCountOfRowsScannedInRegion = 0;
429      // ROWS is the data loaded by loadTable()
430      for (byte[] row : HBaseTestingUtil.ROWS) {
431        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccb) <= 0) {
432          expectedCountOfRowsScannedInRegion++;
433        }
434      }
435      ScanMetrics scanMetrics;
436      Set<String> expectedSplitRegionRes = new HashSet<>();
437
438      // Initialize scan
439      Scan scan = generateScan(bbb, ccb);
440      scan.setEnableScanMetricsByRegion(true);
441      scan.setMaxResultSize(8000);
442
443      try (ResultScanner rs = table.getScanner(scan)) {
444        boolean isFirstScanOfRegion = true;
445        for (Result r : rs) {
446          if (isFirstScanOfRegion) {
447            splitRegion(tableName, bbb, bmw)
448              .forEach(region -> expectedSplitRegionRes.add(Bytes.toString(region)));
449            isFirstScanOfRegion = false;
450          }
451        }
452
453        scanMetrics = rs.getScanMetrics();
454        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
455          scanMetrics.collectMetricsByRegion();
456
457        long actualCountOfRowsScannedInRegion = 0;
458        long rpcRetiesCount = 0;
459        Set<String> splitRegionRes = new HashSet<>();
460
461        // 1 entry each for parent and two child regions
462        Assert.assertEquals(3, scanMetricsByRegion.size());
463        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
464          .entrySet()) {
465          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
466          Map<String, Long> metricsMap = entry.getValue();
467          long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
468          actualCountOfRowsScannedInRegion += rowsScanned;
469          splitRegionRes.add(scanMetricsRegionInfo.getEncodedRegionName());
470
471          if (metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1) {
472            rpcRetiesCount++;
473          }
474
475          Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
476        }
477        Assert.assertEquals(expectedCountOfRowsScannedInRegion, actualCountOfRowsScannedInRegion);
478        Assert.assertEquals(2, rpcRetiesCount);
479        Assert.assertEquals(expectedSplitRegionRes, splitRegionRes);
480      }
481    } finally {
482      TEST_UTIL.deleteTable(tableName);
483    }
484  }
485
486  @Test
487  public void testScanMetricsByRegionWithRegionMerge() throws Exception {
488    TableName tableName = TableName.valueOf(
489      TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMerge");
490    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
491      TEST_UTIL.loadTable(table, CF);
492
493      // Scan 2 regions with start row keys: bbb and ccc
494      byte[] bbb = Bytes.toBytes("bbb");
495      byte[] ccc = Bytes.toBytes("ccc");
496      byte[] ddc = Bytes.toBytes("ddc");
497      long expectedCountOfRowsScannedInRegions = 0;
498      // ROWS is the data loaded by loadTable()
499      for (byte[] row : HBaseTestingUtil.ROWS) {
500        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ddc) <= 0) {
501          expectedCountOfRowsScannedInRegions++;
502        }
503      }
504      ScanMetrics scanMetrics;
505      Set<String> expectedMergeRegionsRes = new HashSet<>();
506      String mergedRegionEncodedName = null;
507
508      // Initialize scan
509      Scan scan = generateScan(bbb, ddc);
510      scan.setEnableScanMetricsByRegion(true);
511      scan.setMaxResultSize(8000);
512
513      try (ResultScanner rs = table.getScanner(scan)) {
514        boolean isFirstScanOfRegion = true;
515        for (Result r : rs) {
516          if (isFirstScanOfRegion) {
517            List<byte[]> out = mergeRegions(tableName, bbb, ccc);
518            // Entry with index 2 is the encoded region name of merged region
519            mergedRegionEncodedName = Bytes.toString(out.get(2));
520            out.forEach(region -> expectedMergeRegionsRes.add(Bytes.toString(region)));
521            isFirstScanOfRegion = false;
522          }
523        }
524
525        scanMetrics = rs.getScanMetrics();
526        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
527          scanMetrics.collectMetricsByRegion();
528        long actualCountOfRowsScannedInRegions = 0;
529        Set<String> mergeRegionsRes = new HashSet<>();
530        boolean containsMergedRegionInScanMetrics = false;
531
532        // 1 entry each for old region from which first row was scanned and new merged region
533        Assert.assertEquals(2, scanMetricsByRegion.size());
534        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
535          .entrySet()) {
536          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
537          Map<String, Long> metricsMap = entry.getValue();
538          long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
539          actualCountOfRowsScannedInRegions += rowsScanned;
540          mergeRegionsRes.add(scanMetricsRegionInfo.getEncodedRegionName());
541          if (scanMetricsRegionInfo.getEncodedRegionName().equals(mergedRegionEncodedName)) {
542            containsMergedRegionInScanMetrics = true;
543          }
544
545          Assert.assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME));
546          Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
547        }
548        Assert.assertEquals(expectedCountOfRowsScannedInRegions, actualCountOfRowsScannedInRegions);
549        Assert.assertTrue(expectedMergeRegionsRes.containsAll(mergeRegionsRes));
550        Assert.assertTrue(containsMergedRegionInScanMetrics);
551      }
552    } finally {
553      TEST_UTIL.deleteTable(tableName);
554    }
555  }
556
557  private Runnable getPeriodicScanMetricsCollector(ScanMetrics scanMetrics,
558    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegionCollection,
559    CountDownLatch latch) {
560    return new Runnable() {
561      public void run() {
562        try {
563          while (latch.getCount() > 0) {
564            Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
565              scanMetrics.collectMetricsByRegion();
566            mergeScanMetricsByRegion(scanMetricsByRegion, scanMetricsByRegionCollection);
567            Thread.sleep(RAND.nextInt(10));
568          }
569        } catch (InterruptedException e) {
570          throw new RuntimeException(e);
571        }
572      }
573    };
574  }
575
576  private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String, Long>> srcMap,
577    Map<ScanMetricsRegionInfo, Map<String, Long>> dstMap) {
578    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : srcMap.entrySet()) {
579      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
580      Map<String, Long> metricsMap = entry.getValue();
581      if (dstMap.containsKey(scanMetricsRegionInfo)) {
582        Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo);
583        for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) {
584          String metricName = metricEntry.getKey();
585          Long existingValue = dstMetricsMap.get(metricName);
586          Long newValue = metricEntry.getValue();
587          dstMetricsMap.put(metricName, existingValue + newValue);
588        }
589      } else {
590        dstMap.put(scanMetricsRegionInfo, metricsMap);
591      }
592    }
593  }
594
595  /**
596   * Moves the region with start row key from its original region server to some other region
597   * server. This is a synchronous method.
598   * @param tableName Table name of region to be moved belongs.
599   * @param startRow  Start row key of the region to be moved.
600   * @return Encoded region name of the region which was moved.
601   */
602  private byte[] moveRegion(TableName tableName, byte[] startRow) throws IOException {
603    Admin admin = TEST_UTIL.getAdmin();
604    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
605    HRegionLocation loc = regionLocator.getRegionLocation(startRow, true);
606    byte[] encodedRegionName = loc.getRegion().getEncodedNameAsBytes();
607    ServerName initialServerName = loc.getServerName();
608
609    admin.move(encodedRegionName);
610
611    ServerName finalServerName = regionLocator.getRegionLocation(startRow, true).getServerName();
612
613    // Assert that region actually moved
614    Assert.assertNotEquals(initialServerName, finalServerName);
615    return encodedRegionName;
616  }
617
618  /**
619   * Splits the region with start row key at the split key provided. This is a synchronous method.
620   * @param tableName Table name of region to be split.
621   * @param startRow  Start row key of the region to be split.
622   * @param splitKey  Split key for splitting the region.
623   * @return List of encoded region names with first element being parent region followed by two
624   *         child regions.
625   */
626  private List<byte[]> splitRegion(TableName tableName, byte[] startRow, byte[] splitKey)
627    throws IOException {
628    Admin admin = TEST_UTIL.getAdmin();
629    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
630    HRegionLocation topLoc = regionLocator.getRegionLocation(startRow, true);
631    byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
632    ServerName initialTopServerName = topLoc.getServerName();
633    HRegionLocation bottomLoc = regionLocator.getRegionLocation(splitKey, true);
634    byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
635    ServerName initialBottomServerName = bottomLoc.getServerName();
636
637    // Assert region is ready for split
638    Assert.assertEquals(initialTopServerName, initialBottomServerName);
639    Assert.assertEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName);
640
641    FutureUtils.get(admin.splitRegionAsync(initialEncodedTopRegionName, splitKey));
642
643    topLoc = regionLocator.getRegionLocation(startRow, true);
644    byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
645    bottomLoc = regionLocator.getRegionLocation(splitKey, true);
646    byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
647
648    // Assert that region split is complete
649    Assert.assertNotEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName);
650    Assert.assertNotEquals(initialEncodedTopRegionName, finalEncodedBottomRegionName);
651    Assert.assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName);
652
653    return Arrays.asList(initialEncodedTopRegionName, finalEncodedTopRegionName,
654      finalEncodedBottomRegionName);
655  }
656
657  /**
658   * Merges two regions with the start row key as topRegion and bottomRegion. Ensures that the
659   * regions to be merged are adjacent regions. This is a synchronous method.
660   * @param tableName    Table name of regions to be merged.
661   * @param topRegion    Start row key of first region for merging.
662   * @param bottomRegion Start row key of second region for merging.
663   * @return List of encoded region names with first two elements being original regions followed by
664   *         the merged region.
665   */
666  private List<byte[]> mergeRegions(TableName tableName, byte[] topRegion, byte[] bottomRegion)
667    throws IOException {
668    Admin admin = TEST_UTIL.getAdmin();
669    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
670    HRegionLocation topLoc = regionLocator.getRegionLocation(topRegion, true);
671    byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
672    String initialTopRegionEndKey = Bytes.toString(topLoc.getRegion().getEndKey());
673    HRegionLocation bottomLoc = regionLocator.getRegionLocation(bottomRegion, true);
674    byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
675    String initialBottomRegionStartKey = Bytes.toString(bottomLoc.getRegion().getStartKey());
676
677    // Assert that regions are ready to be merged
678    Assert.assertNotEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName);
679    Assert.assertEquals(initialBottomRegionStartKey, initialTopRegionEndKey);
680
681    FutureUtils.get(admin.mergeRegionsAsync(
682      new byte[][] { initialEncodedTopRegionName, initialEncodedBottomRegionName }, false));
683
684    topLoc = regionLocator.getRegionLocation(topRegion, true);
685    byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
686    bottomLoc = regionLocator.getRegionLocation(bottomRegion, true);
687    byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
688
689    // Assert regions have been merges successfully
690    Assert.assertEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName);
691    Assert.assertNotEquals(initialEncodedTopRegionName, finalEncodedTopRegionName);
692    Assert.assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName);
693
694    return Arrays.asList(initialEncodedTopRegionName, initialEncodedBottomRegionName,
695      finalEncodedTopRegionName);
696  }
697}