001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
023import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
024import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME;
025import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME;
026import static org.junit.Assert.assertEquals;
027
028import java.io.IOException;
029import java.util.Arrays;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Random;
035import java.util.Set;
036import java.util.concurrent.CountDownLatch;
037import java.util.concurrent.Executors;
038import java.util.concurrent.ThreadPoolExecutor;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicInteger;
041import java.util.concurrent.atomic.AtomicLong;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.HRegionLocation;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
050import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
051import org.apache.hadoop.hbase.testclassification.ClientTests;
052import org.apache.hadoop.hbase.testclassification.LargeTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.FutureUtils;
055import org.junit.AfterClass;
056import org.junit.Assert;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.runners.Parameterized.Parameter;
062import org.junit.runners.Parameterized.Parameters;
063
064@Category({ ClientTests.class, LargeTests.class })
065public class TestTableScanMetrics extends FromClientSideBase {
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestTableScanMetrics.class);
069
070  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
071
072  private static final TableName TABLE_NAME =
073    TableName.valueOf(TestTableScanMetrics.class.getSimpleName());
074
075  private static final byte[] CF = Bytes.toBytes("cf");
076
077  private static final byte[] CQ = Bytes.toBytes("cq");
078
079  private static final byte[] VALUE = Bytes.toBytes("value");
080
081  private static final Random RAND = new Random(11);
082
083  private static int NUM_REGIONS;
084
085  private static Connection CONN;
086
087  @Parameters(name = "{index}: scanner={0}")
088  public static List<Object[]> params() {
089    return Arrays.asList(new Object[] { "ForwardScanner", new Scan() },
090      new Object[] { "ReverseScanner", new Scan().setReversed(true) });
091  }
092
093  @Parameter(0)
094  public String scannerName;
095
096  @Parameter(1)
097  public Scan originalScan;
098
099  @BeforeClass
100  public static void setUp() throws Exception {
101    // Start the minicluster
102    TEST_UTIL.startMiniCluster(2);
103    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that
104    // scan hits all the region and not all rows lie in a single region
105    try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
106      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE),
107        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
108        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
109    }
110    CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
111    NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
112  }
113
114  @AfterClass
115  public static void tearDown() throws Exception {
116    TEST_UTIL.shutdownMiniCluster();
117  }
118
119  private Scan generateScan(byte[] smallerRow, byte[] largerRow) throws IOException {
120    Scan scan = new Scan(originalScan);
121    if (originalScan.isReversed()) {
122      scan.withStartRow(largerRow, true);
123      scan.withStopRow(smallerRow, true);
124    } else {
125      scan.withStartRow(smallerRow, true);
126      scan.withStopRow(largerRow, true);
127    }
128    return scan;
129  }
130
131  private ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int expectedCount)
132    throws IOException {
133    int countOfRows = 0;
134    ScanMetrics scanMetrics;
135    try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = table.getScanner(scan)) {
136      for (Result result : scanner) {
137        Assert.assertFalse(result.isEmpty());
138        countOfRows++;
139      }
140      scanMetrics = scanner.getScanMetrics();
141    }
142    Assert.assertEquals(expectedCount, countOfRows);
143    return scanMetrics;
144  }
145
146  @Test
147  public void testScanMetricsDisabled() throws Exception {
148    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
149    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3);
150    Assert.assertNull(scanMetrics);
151  }
152
153  @Test
154  public void testScanMetricsWithScanMetricByRegionDisabled() throws Exception {
155    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
156    scan.setScanMetricsEnabled(true);
157    int expectedRowsScanned = 3;
158    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
159    Assert.assertNotNull(scanMetrics);
160    Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
161    // The test setup is such that we have 1 row per region in the scan range
162    Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRegions.get());
163    Assert.assertEquals(expectedRowsScanned,
164      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
165    Assert.assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
166  }
167
168  @Test
169  public void testScanMetricsResetWithScanMetricsByRegionDisabled() throws Exception {
170    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
171    scan.setScanMetricsEnabled(true);
172    int expectedRowsScanned = 3;
173    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
174    Assert.assertNotNull(scanMetrics);
175    // By default counters are collected with reset as true
176    Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
177    Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
178    Assert.assertEquals(expectedRowsScanned,
179      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
180    // Subsequent call to get scan metrics map should show all counters as 0
181    Assert.assertEquals(0, scanMetrics.countOfRegions.get());
182    Assert.assertEquals(0, scanMetrics.countOfRowsScanned.get());
183  }
184
185  @Test
186  public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
187    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("xxx1"));
188    scan.setEnableScanMetricsByRegion(true);
189    int expectedRowsScanned = 1;
190    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
191    Assert.assertNotNull(scanMetrics);
192    Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
193    Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
194    Assert.assertEquals(expectedRowsScanned,
195      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
196    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
197      scanMetrics.collectMetricsByRegion(false);
198    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
199    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
200      .entrySet()) {
201      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
202      metricsMap = entry.getValue();
203      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
204      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
205      // As we are scanning single row so, overall scan metrics will match per region scan metrics
206      Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
207      Assert.assertEquals(expectedRowsScanned,
208        (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
209    }
210  }
211
212  @Test
213  public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
214    Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
215    scan.setEnableScanMetricsByRegion(true);
216    int expectedRowsScanned = 3;
217    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
218    Assert.assertNotNull(scanMetrics);
219    Assert.assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
220    Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRowsScanned.get());
221    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
222      scanMetrics.collectMetricsByRegion(false);
223    Assert.assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
224    int rowsScannedAcrossAllRegions = 0;
225    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
226      .entrySet()) {
227      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
228      Map<String, Long> metricsMap = entry.getValue();
229      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
230      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
231      Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
232      if (metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
233        rowsScannedAcrossAllRegions++;
234      } else {
235        assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
236      }
237    }
238    Assert.assertEquals(expectedRowsScanned, rowsScannedAcrossAllRegions);
239  }
240
241  @Test
242  public void testScanMetricsByRegionReset() throws Exception {
243    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
244    scan.setEnableScanMetricsByRegion(true);
245    int expectedRowsScanned = 3;
246    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
247    Assert.assertNotNull(scanMetrics);
248
249    // Retrieve scan metrics by region as a map and reset
250    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
251      scanMetrics.collectMetricsByRegion();
252    // We scan 1 row per region
253    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
254    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
255      .entrySet()) {
256      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
257      Map<String, Long> metricsMap = entry.getValue();
258      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
259      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
260      Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
261      Assert.assertEquals(1, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
262    }
263
264    // Scan metrics have already been reset and now all counters should be 0
265    scanMetricsByRegion = scanMetrics.collectMetricsByRegion(false);
266    // Size of map should be same as earlier
267    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
268    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
269      .entrySet()) {
270      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
271      Map<String, Long> metricsMap = entry.getValue();
272      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
273      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
274      // Counters should have been reset to 0
275      Assert.assertEquals(0, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
276      Assert.assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
277    }
278  }
279
280  @Test
281  public void testConcurrentUpdatesAndResetOfScanMetricsByRegion() throws Exception {
282    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
283    TableName tableName = TableName.valueOf(TestTableScanMetrics.class.getSimpleName()
284      + "_testConcurrentUpdatesAndResetToScanMetricsByRegion");
285    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
286      TEST_UTIL.loadTable(table, CF);
287
288      Map<ScanMetricsRegionInfo, Map<String, Long>> concurrentScanMetricsByRegion = new HashMap<>();
289
290      // Trigger two concurrent threads one of which scans the table and other periodically
291      // collects the scan metrics (along with resetting the counters to 0).
292      Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
293      scan.setEnableScanMetricsByRegion(true);
294      scan.setCaching(2);
295      try (ResultScanner rs = table.getScanner(scan)) {
296        ScanMetrics scanMetrics = rs.getScanMetrics();
297        AtomicInteger rowsScanned = new AtomicInteger(0);
298        CountDownLatch latch = new CountDownLatch(1);
299        Runnable tableScanner = new Runnable() {
300          public void run() {
301            for (Result r : rs) {
302              Assert.assertFalse(r.isEmpty());
303              rowsScanned.incrementAndGet();
304            }
305            latch.countDown();
306          }
307        };
308        Runnable metricsCollector =
309          getPeriodicScanMetricsCollector(scanMetrics, concurrentScanMetricsByRegion, latch);
310        executor.execute(tableScanner);
311        executor.execute(metricsCollector);
312        latch.await();
313        // Merge leftover scan metrics
314        mergeScanMetricsByRegion(scanMetrics.collectMetricsByRegion(),
315          concurrentScanMetricsByRegion);
316        Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned.get());
317      }
318
319      Map<ScanMetricsRegionInfo, Map<String, Long>> expectedScanMetricsByRegion;
320
321      // Collect scan metrics by region from single thread. Assert that concurrent scan
322      // and metrics collection works as expected.
323      scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
324      scan.setEnableScanMetricsByRegion(true);
325      scan.setCaching(2);
326      try (ResultScanner rs = table.getScanner(scan)) {
327        ScanMetrics scanMetrics = rs.getScanMetrics();
328        int rowsScanned = 0;
329        for (Result r : rs) {
330          Assert.assertFalse(r.isEmpty());
331          rowsScanned++;
332        }
333        Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned);
334        expectedScanMetricsByRegion = scanMetrics.collectMetricsByRegion();
335        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : expectedScanMetricsByRegion
336          .entrySet()) {
337          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
338          Map<String, Long> metricsMap = entry.getValue();
339          metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
340          metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
341          Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
342          Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
343          Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
344          // Each region will have 26 * 26 + 26 + 1 rows except last region which will have 1 row
345          long rowsScannedFromMetrics = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
346          Assert.assertTrue(
347            rowsScannedFromMetrics == 1 || rowsScannedFromMetrics == (26 * 26 + 26 + 1));
348        }
349      }
350
351      // Assert on scan metrics by region
352      Assert.assertEquals(expectedScanMetricsByRegion, concurrentScanMetricsByRegion);
353    } finally {
354      TEST_UTIL.deleteTable(tableName);
355    }
356  }
357
358  @Test
359  public void testRPCCallProcessingAndQueueWaitTimeMetrics() throws Exception {
360    final int numThreads = 20;
361    Configuration conf = TEST_UTIL.getConfiguration();
362    // Handler count is 3 by default.
363    int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
364      HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
365    // Keep the number of threads to be high enough for RPC calls to queue up. For now going with 6
366    // times the handler count.
367    Assert.assertTrue(numThreads > 6 * handlerCount);
368    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
369    TableName tableName = TableName.valueOf(
370      TestTableScanMetrics.class.getSimpleName() + "_testRPCCallProcessingAndQueueWaitTimeMetrics");
371    AtomicLong totalScanRpcTime = new AtomicLong(0);
372    AtomicLong totalQueueWaitTime = new AtomicLong(0);
373    CountDownLatch latch = new CountDownLatch(numThreads);
374    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
375      TEST_UTIL.loadTable(table, CF);
376      for (int i = 0; i < numThreads; i++) {
377        executor.execute(new Runnable() {
378          @Override
379          public void run() {
380            try {
381              Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
382              scan.setEnableScanMetricsByRegion(true);
383              scan.setCaching(2);
384              try (ResultScanner rs = table.getScanner(scan)) {
385                Result r;
386                while ((r = rs.next()) != null) {
387                  Assert.assertFalse(r.isEmpty());
388                }
389                ScanMetrics scanMetrics = rs.getScanMetrics();
390                Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
391                totalScanRpcTime.addAndGet(metricsMap.get(RPC_SCAN_PROCESSING_TIME_METRIC_NAME));
392                totalQueueWaitTime.addAndGet(metricsMap.get(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME));
393              }
394              latch.countDown();
395            } catch (IOException e) {
396              throw new RuntimeException(e);
397            }
398          }
399        });
400      }
401      latch.await();
402      executor.shutdown();
403      executor.awaitTermination(10, TimeUnit.SECONDS);
404      Assert.assertTrue(totalScanRpcTime.get() > 0);
405      Assert.assertTrue(totalQueueWaitTime.get() > 0);
406    } finally {
407      TEST_UTIL.deleteTable(tableName);
408    }
409  }
410
411  @Test
412  public void testScanMetricsByRegionWithRegionMove() throws Exception {
413    TableName tableName = TableName.valueOf(
414      TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMove");
415    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
416      TEST_UTIL.loadTable(table, CF);
417
418      // Scan 2 regions with start row keys: bbb and ccc
419      byte[] bbb = Bytes.toBytes("bbb");
420      byte[] ccc = Bytes.toBytes("ccc");
421      byte[] ddc = Bytes.toBytes("ddc");
422      long expectedCountOfRowsScannedInMovedRegion = 0;
423      // ROWS is the data loaded by loadTable()
424      for (byte[] row : HBaseTestingUtil.ROWS) {
425        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccc) < 0) {
426          expectedCountOfRowsScannedInMovedRegion++;
427        }
428      }
429      byte[] movedRegion = null;
430      ScanMetrics scanMetrics;
431
432      // Initialize scan with maxResultSize as size of 50 rows.
433      Scan scan = generateScan(bbb, ddc);
434      scan.setEnableScanMetricsByRegion(true);
435      scan.setMaxResultSize(8000);
436
437      try (ResultScanner rs = table.getScanner(scan)) {
438        boolean isFirstScanOfRegion = true;
439        for (Result r : rs) {
440          byte[] row = r.getRow();
441          if (isFirstScanOfRegion) {
442            movedRegion = moveRegion(tableName, row);
443            isFirstScanOfRegion = false;
444          }
445        }
446        Assert.assertNotNull(movedRegion);
447
448        scanMetrics = rs.getScanMetrics();
449        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
450          scanMetrics.collectMetricsByRegion();
451        long actualCountOfRowsScannedInMovedRegion = 0;
452        Set<ServerName> serversForMovedRegion = new HashSet<>();
453
454        // 2 regions scanned with two entries for first region as it moved in b/w scan
455        Assert.assertEquals(3, scanMetricsByRegion.size());
456        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
457          .entrySet()) {
458          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
459          Map<String, Long> metricsMap = entry.getValue();
460          if (scanMetricsRegionInfo.getEncodedRegionName().equals(Bytes.toString(movedRegion))) {
461            long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
462            actualCountOfRowsScannedInMovedRegion += rowsScanned;
463            serversForMovedRegion.add(scanMetricsRegionInfo.getServerName());
464
465            Assert.assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME));
466          }
467          Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
468        }
469        Assert.assertEquals(expectedCountOfRowsScannedInMovedRegion,
470          actualCountOfRowsScannedInMovedRegion);
471        Assert.assertEquals(2, serversForMovedRegion.size());
472      }
473    } finally {
474      TEST_UTIL.deleteTable(tableName);
475    }
476  }
477
478  @Test
479  public void testScanMetricsByRegionWithRegionSplit() throws Exception {
480    TableName tableName = TableName.valueOf(
481      TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionSplit");
482    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
483      TEST_UTIL.loadTable(table, CF);
484
485      // Scan 1 region with start row key: bbb
486      byte[] bbb = Bytes.toBytes("bbb");
487      byte[] bmw = Bytes.toBytes("bmw");
488      byte[] ccb = Bytes.toBytes("ccb");
489      long expectedCountOfRowsScannedInRegion = 0;
490      // ROWS is the data loaded by loadTable()
491      for (byte[] row : HBaseTestingUtil.ROWS) {
492        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccb) <= 0) {
493          expectedCountOfRowsScannedInRegion++;
494        }
495      }
496      ScanMetrics scanMetrics;
497      Set<String> expectedSplitRegionRes = new HashSet<>();
498
499      // Initialize scan
500      Scan scan = generateScan(bbb, ccb);
501      scan.setEnableScanMetricsByRegion(true);
502      scan.setMaxResultSize(8000);
503
504      try (ResultScanner rs = table.getScanner(scan)) {
505        boolean isFirstScanOfRegion = true;
506        for (Result r : rs) {
507          if (isFirstScanOfRegion) {
508            splitRegion(tableName, bbb, bmw)
509              .forEach(region -> expectedSplitRegionRes.add(Bytes.toString(region)));
510            isFirstScanOfRegion = false;
511          }
512        }
513
514        scanMetrics = rs.getScanMetrics();
515        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
516          scanMetrics.collectMetricsByRegion();
517
518        long actualCountOfRowsScannedInRegion = 0;
519        long rpcRetiesCount = 0;
520        Set<String> splitRegionRes = new HashSet<>();
521
522        // 1 entry each for parent and two child regions
523        Assert.assertEquals(3, scanMetricsByRegion.size());
524        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
525          .entrySet()) {
526          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
527          Map<String, Long> metricsMap = entry.getValue();
528          long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
529          actualCountOfRowsScannedInRegion += rowsScanned;
530          splitRegionRes.add(scanMetricsRegionInfo.getEncodedRegionName());
531
532          if (metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1) {
533            rpcRetiesCount++;
534          }
535
536          Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
537        }
538        Assert.assertEquals(expectedCountOfRowsScannedInRegion, actualCountOfRowsScannedInRegion);
539        Assert.assertEquals(2, rpcRetiesCount);
540        Assert.assertEquals(expectedSplitRegionRes, splitRegionRes);
541      }
542    } finally {
543      TEST_UTIL.deleteTable(tableName);
544    }
545  }
546
547  @Test
548  public void testScanMetricsByRegionWithRegionMerge() throws Exception {
549    TableName tableName = TableName.valueOf(
550      TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMerge");
551    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
552      TEST_UTIL.loadTable(table, CF);
553
554      // Scan 2 regions with start row keys: bbb and ccc
555      byte[] bbb = Bytes.toBytes("bbb");
556      byte[] ccc = Bytes.toBytes("ccc");
557      byte[] ddc = Bytes.toBytes("ddc");
558      long expectedCountOfRowsScannedInRegions = 0;
559      // ROWS is the data loaded by loadTable()
560      for (byte[] row : HBaseTestingUtil.ROWS) {
561        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ddc) <= 0) {
562          expectedCountOfRowsScannedInRegions++;
563        }
564      }
565      ScanMetrics scanMetrics;
566      Set<String> expectedMergeRegionsRes = new HashSet<>();
567      String mergedRegionEncodedName = null;
568
569      // Initialize scan
570      Scan scan = generateScan(bbb, ddc);
571      scan.setEnableScanMetricsByRegion(true);
572      scan.setMaxResultSize(8000);
573
574      try (ResultScanner rs = table.getScanner(scan)) {
575        boolean isFirstScanOfRegion = true;
576        for (Result r : rs) {
577          if (isFirstScanOfRegion) {
578            List<byte[]> out = mergeRegions(tableName, bbb, ccc);
579            // Entry with index 2 is the encoded region name of merged region
580            mergedRegionEncodedName = Bytes.toString(out.get(2));
581            out.forEach(region -> expectedMergeRegionsRes.add(Bytes.toString(region)));
582            isFirstScanOfRegion = false;
583          }
584        }
585
586        scanMetrics = rs.getScanMetrics();
587        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
588          scanMetrics.collectMetricsByRegion();
589        long actualCountOfRowsScannedInRegions = 0;
590        Set<String> mergeRegionsRes = new HashSet<>();
591        boolean containsMergedRegionInScanMetrics = false;
592
593        // 1 entry each for old region from which first row was scanned and new merged region
594        Assert.assertEquals(2, scanMetricsByRegion.size());
595        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
596          .entrySet()) {
597          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
598          Map<String, Long> metricsMap = entry.getValue();
599          long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
600          actualCountOfRowsScannedInRegions += rowsScanned;
601          mergeRegionsRes.add(scanMetricsRegionInfo.getEncodedRegionName());
602          if (scanMetricsRegionInfo.getEncodedRegionName().equals(mergedRegionEncodedName)) {
603            containsMergedRegionInScanMetrics = true;
604          }
605
606          Assert.assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME));
607          Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
608        }
609        Assert.assertEquals(expectedCountOfRowsScannedInRegions, actualCountOfRowsScannedInRegions);
610        Assert.assertTrue(expectedMergeRegionsRes.containsAll(mergeRegionsRes));
611        Assert.assertTrue(containsMergedRegionInScanMetrics);
612      }
613    } finally {
614      TEST_UTIL.deleteTable(tableName);
615    }
616  }
617
618  private Runnable getPeriodicScanMetricsCollector(ScanMetrics scanMetrics,
619    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegionCollection,
620    CountDownLatch latch) {
621    return new Runnable() {
622      public void run() {
623        try {
624          while (latch.getCount() > 0) {
625            Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
626              scanMetrics.collectMetricsByRegion();
627            mergeScanMetricsByRegion(scanMetricsByRegion, scanMetricsByRegionCollection);
628            Thread.sleep(RAND.nextInt(10));
629          }
630        } catch (InterruptedException e) {
631          throw new RuntimeException(e);
632        }
633      }
634    };
635  }
636
637  private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String, Long>> srcMap,
638    Map<ScanMetricsRegionInfo, Map<String, Long>> dstMap) {
639    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : srcMap.entrySet()) {
640      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
641      Map<String, Long> metricsMap = entry.getValue();
642      metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
643      metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
644      if (dstMap.containsKey(scanMetricsRegionInfo)) {
645        Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo);
646        for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) {
647          String metricName = metricEntry.getKey();
648          Long existingValue = dstMetricsMap.get(metricName);
649          Long newValue = metricEntry.getValue();
650          dstMetricsMap.put(metricName, existingValue + newValue);
651        }
652      } else {
653        dstMap.put(scanMetricsRegionInfo, metricsMap);
654      }
655    }
656  }
657
658  /**
659   * Moves the region with start row key from its original region server to some other region
660   * server. This is a synchronous method.
661   * @param tableName Table name of region to be moved belongs.
662   * @param startRow  Start row key of the region to be moved.
663   * @return Encoded region name of the region which was moved.
664   */
665  private byte[] moveRegion(TableName tableName, byte[] startRow) throws IOException {
666    Admin admin = TEST_UTIL.getAdmin();
667    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
668    HRegionLocation loc = regionLocator.getRegionLocation(startRow, true);
669    byte[] encodedRegionName = loc.getRegion().getEncodedNameAsBytes();
670    ServerName initialServerName = loc.getServerName();
671
672    admin.move(encodedRegionName);
673
674    ServerName finalServerName = regionLocator.getRegionLocation(startRow, true).getServerName();
675
676    // Assert that region actually moved
677    Assert.assertNotEquals(initialServerName, finalServerName);
678    return encodedRegionName;
679  }
680
681  /**
682   * Splits the region with start row key at the split key provided. This is a synchronous method.
683   * @param tableName Table name of region to be split.
684   * @param startRow  Start row key of the region to be split.
685   * @param splitKey  Split key for splitting the region.
686   * @return List of encoded region names with first element being parent region followed by two
687   *         child regions.
688   */
689  private List<byte[]> splitRegion(TableName tableName, byte[] startRow, byte[] splitKey)
690    throws IOException {
691    Admin admin = TEST_UTIL.getAdmin();
692    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
693    HRegionLocation topLoc = regionLocator.getRegionLocation(startRow, true);
694    byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
695    ServerName initialTopServerName = topLoc.getServerName();
696    HRegionLocation bottomLoc = regionLocator.getRegionLocation(splitKey, true);
697    byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
698    ServerName initialBottomServerName = bottomLoc.getServerName();
699
700    // Assert region is ready for split
701    Assert.assertEquals(initialTopServerName, initialBottomServerName);
702    Assert.assertEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName);
703
704    FutureUtils.get(admin.splitRegionAsync(initialEncodedTopRegionName, splitKey));
705
706    topLoc = regionLocator.getRegionLocation(startRow, true);
707    byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
708    bottomLoc = regionLocator.getRegionLocation(splitKey, true);
709    byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
710
711    // Assert that region split is complete
712    Assert.assertNotEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName);
713    Assert.assertNotEquals(initialEncodedTopRegionName, finalEncodedBottomRegionName);
714    Assert.assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName);
715
716    return Arrays.asList(initialEncodedTopRegionName, finalEncodedTopRegionName,
717      finalEncodedBottomRegionName);
718  }
719
720  /**
721   * Merges two regions with the start row key as topRegion and bottomRegion. Ensures that the
722   * regions to be merged are adjacent regions. This is a synchronous method.
723   * @param tableName    Table name of regions to be merged.
724   * @param topRegion    Start row key of first region for merging.
725   * @param bottomRegion Start row key of second region for merging.
726   * @return List of encoded region names with first two elements being original regions followed by
727   *         the merged region.
728   */
729  private List<byte[]> mergeRegions(TableName tableName, byte[] topRegion, byte[] bottomRegion)
730    throws IOException {
731    Admin admin = TEST_UTIL.getAdmin();
732    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
733    HRegionLocation topLoc = regionLocator.getRegionLocation(topRegion, true);
734    byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
735    String initialTopRegionEndKey = Bytes.toString(topLoc.getRegion().getEndKey());
736    HRegionLocation bottomLoc = regionLocator.getRegionLocation(bottomRegion, true);
737    byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
738    String initialBottomRegionStartKey = Bytes.toString(bottomLoc.getRegion().getStartKey());
739
740    // Assert that regions are ready to be merged
741    Assert.assertNotEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName);
742    Assert.assertEquals(initialBottomRegionStartKey, initialTopRegionEndKey);
743
744    FutureUtils.get(admin.mergeRegionsAsync(
745      new byte[][] { initialEncodedTopRegionName, initialEncodedBottomRegionName }, false));
746
747    topLoc = regionLocator.getRegionLocation(topRegion, true);
748    byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
749    bottomLoc = regionLocator.getRegionLocation(bottomRegion, true);
750    byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
751
752    // Assert regions have been merges successfully
753    Assert.assertEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName);
754    Assert.assertNotEquals(initialEncodedTopRegionName, finalEncodedTopRegionName);
755    Assert.assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName);
756
757    return Arrays.asList(initialEncodedTopRegionName, initialEncodedBottomRegionName,
758      finalEncodedTopRegionName);
759  }
760}