001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME;
022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
023import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
024import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
025import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME;
026import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME;
027import static org.junit.Assert.assertEquals;
028
029import java.io.IOException;
030import java.util.Arrays;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.Random;
036import java.util.Set;
037import java.util.concurrent.CountDownLatch;
038import java.util.concurrent.Executors;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicInteger;
042import java.util.concurrent.atomic.AtomicLong;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
051import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
052import org.apache.hadoop.hbase.testclassification.ClientTests;
053import org.apache.hadoop.hbase.testclassification.LargeTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.FutureUtils;
056import org.junit.AfterClass;
057import org.junit.Assert;
058import org.junit.BeforeClass;
059import org.junit.ClassRule;
060import org.junit.Test;
061import org.junit.experimental.categories.Category;
062import org.junit.runners.Parameterized.Parameter;
063import org.junit.runners.Parameterized.Parameters;
064
065@Category({ ClientTests.class, LargeTests.class })
066public class TestTableScanMetrics extends FromClientSideBase {
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestTableScanMetrics.class);
070
071  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
072
073  private static final TableName TABLE_NAME =
074    TableName.valueOf(TestTableScanMetrics.class.getSimpleName());
075
076  private static final byte[] CF = Bytes.toBytes("cf");
077
078  private static final byte[] CQ = Bytes.toBytes("cq");
079
080  private static final byte[] VALUE = Bytes.toBytes("value");
081
082  private static final Random RAND = new Random(11);
083
084  private static int NUM_REGIONS;
085
086  private static Connection CONN;
087
088  @Parameters(name = "{index}: scanner={0}")
089  public static List<Object[]> params() {
090    return Arrays.asList(new Object[] { "ForwardScanner", new Scan() },
091      new Object[] { "ReverseScanner", new Scan().setReversed(true) });
092  }
093
094  @Parameter(0)
095  public String scannerName;
096
097  @Parameter(1)
098  public Scan originalScan;
099
100  @BeforeClass
101  public static void setUp() throws Exception {
102    // Start the minicluster
103    TEST_UTIL.startMiniCluster(2);
104    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that
105    // scan hits all the region and not all rows lie in a single region
106    try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
107      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE),
108        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
109        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
110    }
111    CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
112    NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
113  }
114
115  @AfterClass
116  public static void tearDown() throws Exception {
117    TEST_UTIL.shutdownMiniCluster();
118  }
119
120  private Scan generateScan(byte[] smallerRow, byte[] largerRow) throws IOException {
121    Scan scan = new Scan(originalScan);
122    if (originalScan.isReversed()) {
123      scan.withStartRow(largerRow, true);
124      scan.withStopRow(smallerRow, true);
125    } else {
126      scan.withStartRow(smallerRow, true);
127      scan.withStopRow(largerRow, true);
128    }
129    return scan;
130  }
131
132  private ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int expectedCount)
133    throws IOException {
134    int countOfRows = 0;
135    ScanMetrics scanMetrics;
136    try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = table.getScanner(scan)) {
137      for (Result result : scanner) {
138        Assert.assertFalse(result.isEmpty());
139        countOfRows++;
140      }
141      scanMetrics = scanner.getScanMetrics();
142    }
143    Assert.assertEquals(expectedCount, countOfRows);
144    return scanMetrics;
145  }
146
147  @Test
148  public void testScanMetricsDisabled() throws Exception {
149    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
150    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3);
151    Assert.assertNull(scanMetrics);
152  }
153
154  @Test
155  public void testScanMetricsWithScanMetricByRegionDisabled() throws Exception {
156    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
157    scan.setScanMetricsEnabled(true);
158    int expectedRowsScanned = 3;
159    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
160    Assert.assertNotNull(scanMetrics);
161    Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
162    // The test setup is such that we have 1 row per region in the scan range
163    Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRegions.get());
164    Assert.assertEquals(expectedRowsScanned,
165      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
166    Assert.assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
167  }
168
169  @Test
170  public void testScanMetricsResetWithScanMetricsByRegionDisabled() throws Exception {
171    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
172    scan.setScanMetricsEnabled(true);
173    int expectedRowsScanned = 3;
174    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
175    Assert.assertNotNull(scanMetrics);
176    // By default counters are collected with reset as true
177    Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
178    Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
179    Assert.assertEquals(expectedRowsScanned,
180      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
181    // Subsequent call to get scan metrics map should show all counters as 0
182    Assert.assertEquals(0, scanMetrics.countOfRegions.get());
183    Assert.assertEquals(0, scanMetrics.countOfRowsScanned.get());
184  }
185
186  @Test
187  public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
188    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("xxx1"));
189    scan.setEnableScanMetricsByRegion(true);
190    int expectedRowsScanned = 1;
191    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
192    Assert.assertNotNull(scanMetrics);
193    Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
194    Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
195    Assert.assertEquals(expectedRowsScanned,
196      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
197    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
198      scanMetrics.collectMetricsByRegion(false);
199    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
200    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
201      .entrySet()) {
202      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
203      metricsMap = entry.getValue();
204      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
205      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
206      // As we are scanning single row so, overall scan metrics will match per region scan metrics
207      Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
208      Assert.assertEquals(expectedRowsScanned,
209        (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
210    }
211  }
212
213  @Test
214  public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
215    Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
216    scan.setEnableScanMetricsByRegion(true);
217    int expectedRowsScanned = 3;
218    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
219    Assert.assertNotNull(scanMetrics);
220    Assert.assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
221    Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRowsScanned.get());
222    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
223      scanMetrics.collectMetricsByRegion(false);
224    Assert.assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
225    int rowsScannedAcrossAllRegions = 0;
226    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
227      .entrySet()) {
228      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
229      Map<String, Long> metricsMap = entry.getValue();
230      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
231      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
232      Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
233      if (metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
234        rowsScannedAcrossAllRegions++;
235      } else {
236        assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
237      }
238    }
239    Assert.assertEquals(expectedRowsScanned, rowsScannedAcrossAllRegions);
240  }
241
242  @Test
243  public void testScanMetricsByRegionReset() throws Exception {
244    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
245    scan.setEnableScanMetricsByRegion(true);
246    int expectedRowsScanned = 3;
247    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned);
248    Assert.assertNotNull(scanMetrics);
249
250    // Retrieve scan metrics by region as a map and reset
251    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
252      scanMetrics.collectMetricsByRegion();
253    // We scan 1 row per region
254    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
255    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
256      .entrySet()) {
257      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
258      Map<String, Long> metricsMap = entry.getValue();
259      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
260      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
261      Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
262      Assert.assertEquals(1, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
263    }
264
265    // Scan metrics have already been reset and now all counters should be 0
266    scanMetricsByRegion = scanMetrics.collectMetricsByRegion(false);
267    // Size of map should be same as earlier
268    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
269    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
270      .entrySet()) {
271      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
272      Map<String, Long> metricsMap = entry.getValue();
273      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
274      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
275      // Counters should have been reset to 0
276      Assert.assertEquals(0, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
277      Assert.assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
278    }
279  }
280
281  @Test
282  public void testConcurrentUpdatesAndResetOfScanMetricsByRegion() throws Exception {
283    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
284    TableName tableName = TableName.valueOf(TestTableScanMetrics.class.getSimpleName()
285      + "_testConcurrentUpdatesAndResetToScanMetricsByRegion");
286    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
287      TEST_UTIL.loadTable(table, CF);
288
289      Map<ScanMetricsRegionInfo, Map<String, Long>> concurrentScanMetricsByRegion = new HashMap<>();
290
291      // Trigger two concurrent threads one of which scans the table and other periodically
292      // collects the scan metrics (along with resetting the counters to 0).
293      Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
294      scan.setEnableScanMetricsByRegion(true);
295      scan.setCaching(2);
296      try (ResultScanner rs = table.getScanner(scan)) {
297        ScanMetrics scanMetrics = rs.getScanMetrics();
298        AtomicInteger rowsScanned = new AtomicInteger(0);
299        CountDownLatch latch = new CountDownLatch(1);
300        Runnable tableScanner = new Runnable() {
301          public void run() {
302            for (Result r : rs) {
303              Assert.assertFalse(r.isEmpty());
304              rowsScanned.incrementAndGet();
305            }
306            latch.countDown();
307          }
308        };
309        Runnable metricsCollector =
310          getPeriodicScanMetricsCollector(scanMetrics, concurrentScanMetricsByRegion, latch);
311        executor.execute(tableScanner);
312        executor.execute(metricsCollector);
313        latch.await();
314        // Merge leftover scan metrics
315        mergeScanMetricsByRegion(scanMetrics.collectMetricsByRegion(),
316          concurrentScanMetricsByRegion);
317        Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned.get());
318      }
319
320      Map<ScanMetricsRegionInfo, Map<String, Long>> expectedScanMetricsByRegion;
321
322      // Collect scan metrics by region from single thread. Assert that concurrent scan
323      // and metrics collection works as expected.
324      scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
325      scan.setEnableScanMetricsByRegion(true);
326      scan.setCaching(2);
327      try (ResultScanner rs = table.getScanner(scan)) {
328        ScanMetrics scanMetrics = rs.getScanMetrics();
329        int rowsScanned = 0;
330        for (Result r : rs) {
331          Assert.assertFalse(r.isEmpty());
332          rowsScanned++;
333        }
334        Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned);
335        expectedScanMetricsByRegion = scanMetrics.collectMetricsByRegion();
336        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : expectedScanMetricsByRegion
337          .entrySet()) {
338          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
339          Map<String, Long> metricsMap = entry.getValue();
340          // Remove millis between nexts metric as it is not deterministic
341          metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
342          metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
343          metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
344          Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
345          Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
346          Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
347          // Each region will have 26 * 26 + 26 + 1 rows except last region which will have 1 row
348          long rowsScannedFromMetrics = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
349          Assert.assertTrue(
350            rowsScannedFromMetrics == 1 || rowsScannedFromMetrics == (26 * 26 + 26 + 1));
351        }
352      }
353
354      // Assert on scan metrics by region
355      Assert.assertEquals(expectedScanMetricsByRegion, concurrentScanMetricsByRegion);
356    } finally {
357      TEST_UTIL.deleteTable(tableName);
358    }
359  }
360
361  @Test
362  public void testRPCCallProcessingAndQueueWaitTimeMetrics() throws Exception {
363    final int numThreads = 20;
364    Configuration conf = TEST_UTIL.getConfiguration();
365    // Handler count is 3 by default.
366    int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
367      HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
368    // Keep the number of threads to be high enough for RPC calls to queue up. For now going with 6
369    // times the handler count.
370    Assert.assertTrue(numThreads > 6 * handlerCount);
371    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads);
372    TableName tableName = TableName.valueOf(
373      TestTableScanMetrics.class.getSimpleName() + "_testRPCCallProcessingAndQueueWaitTimeMetrics");
374    AtomicLong totalScanRpcTime = new AtomicLong(0);
375    AtomicLong totalQueueWaitTime = new AtomicLong(0);
376    CountDownLatch latch = new CountDownLatch(numThreads);
377    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
378      TEST_UTIL.loadTable(table, CF);
379      for (int i = 0; i < numThreads; i++) {
380        executor.execute(new Runnable() {
381          @Override
382          public void run() {
383            try {
384              Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
385              scan.setEnableScanMetricsByRegion(true);
386              scan.setCaching(2);
387              try (ResultScanner rs = table.getScanner(scan)) {
388                Result r;
389                while ((r = rs.next()) != null) {
390                  Assert.assertFalse(r.isEmpty());
391                }
392                ScanMetrics scanMetrics = rs.getScanMetrics();
393                Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
394                totalScanRpcTime.addAndGet(metricsMap.get(RPC_SCAN_PROCESSING_TIME_METRIC_NAME));
395                totalQueueWaitTime.addAndGet(metricsMap.get(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME));
396              }
397              latch.countDown();
398            } catch (IOException e) {
399              throw new RuntimeException(e);
400            }
401          }
402        });
403      }
404      latch.await();
405      executor.shutdown();
406      executor.awaitTermination(10, TimeUnit.SECONDS);
407      Assert.assertTrue(totalScanRpcTime.get() > 0);
408      Assert.assertTrue(totalQueueWaitTime.get() > 0);
409    } finally {
410      TEST_UTIL.deleteTable(tableName);
411    }
412  }
413
414  @Test
415  public void testScanMetricsByRegionWithRegionMove() throws Exception {
416    TableName tableName = TableName.valueOf(
417      TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMove");
418    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
419      TEST_UTIL.loadTable(table, CF);
420
421      // Scan 2 regions with start row keys: bbb and ccc
422      byte[] bbb = Bytes.toBytes("bbb");
423      byte[] ccc = Bytes.toBytes("ccc");
424      byte[] ddc = Bytes.toBytes("ddc");
425      long expectedCountOfRowsScannedInMovedRegion = 0;
426      // ROWS is the data loaded by loadTable()
427      for (byte[] row : HBaseTestingUtil.ROWS) {
428        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccc) < 0) {
429          expectedCountOfRowsScannedInMovedRegion++;
430        }
431      }
432      byte[] movedRegion = null;
433      ScanMetrics scanMetrics;
434
435      // Initialize scan with maxResultSize as size of 50 rows.
436      Scan scan = generateScan(bbb, ddc);
437      scan.setEnableScanMetricsByRegion(true);
438      scan.setMaxResultSize(8000);
439
440      try (ResultScanner rs = table.getScanner(scan)) {
441        boolean isFirstScanOfRegion = true;
442        for (Result r : rs) {
443          byte[] row = r.getRow();
444          if (isFirstScanOfRegion) {
445            movedRegion = moveRegion(tableName, row);
446            isFirstScanOfRegion = false;
447          }
448        }
449        Assert.assertNotNull(movedRegion);
450
451        scanMetrics = rs.getScanMetrics();
452        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
453          scanMetrics.collectMetricsByRegion();
454        long actualCountOfRowsScannedInMovedRegion = 0;
455        Set<ServerName> serversForMovedRegion = new HashSet<>();
456
457        // 2 regions scanned with two entries for first region as it moved in b/w scan
458        Assert.assertEquals(3, scanMetricsByRegion.size());
459        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
460          .entrySet()) {
461          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
462          Map<String, Long> metricsMap = entry.getValue();
463          if (scanMetricsRegionInfo.getEncodedRegionName().equals(Bytes.toString(movedRegion))) {
464            long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
465            actualCountOfRowsScannedInMovedRegion += rowsScanned;
466            serversForMovedRegion.add(scanMetricsRegionInfo.getServerName());
467
468            Assert.assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME));
469          }
470          Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
471        }
472        Assert.assertEquals(expectedCountOfRowsScannedInMovedRegion,
473          actualCountOfRowsScannedInMovedRegion);
474        Assert.assertEquals(2, serversForMovedRegion.size());
475      }
476    } finally {
477      TEST_UTIL.deleteTable(tableName);
478    }
479  }
480
481  @Test
482  public void testScanMetricsByRegionWithRegionSplit() throws Exception {
483    TableName tableName = TableName.valueOf(
484      TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionSplit");
485    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
486      TEST_UTIL.loadTable(table, CF);
487
488      // Scan 1 region with start row key: bbb
489      byte[] bbb = Bytes.toBytes("bbb");
490      byte[] bmw = Bytes.toBytes("bmw");
491      byte[] ccb = Bytes.toBytes("ccb");
492      long expectedCountOfRowsScannedInRegion = 0;
493      // ROWS is the data loaded by loadTable()
494      for (byte[] row : HBaseTestingUtil.ROWS) {
495        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccb) <= 0) {
496          expectedCountOfRowsScannedInRegion++;
497        }
498      }
499      ScanMetrics scanMetrics;
500      Set<String> expectedSplitRegionRes = new HashSet<>();
501
502      // Initialize scan
503      Scan scan = generateScan(bbb, ccb);
504      scan.setEnableScanMetricsByRegion(true);
505      scan.setMaxResultSize(8000);
506
507      try (ResultScanner rs = table.getScanner(scan)) {
508        boolean isFirstScanOfRegion = true;
509        for (Result r : rs) {
510          if (isFirstScanOfRegion) {
511            splitRegion(tableName, bbb, bmw)
512              .forEach(region -> expectedSplitRegionRes.add(Bytes.toString(region)));
513            isFirstScanOfRegion = false;
514          }
515        }
516
517        scanMetrics = rs.getScanMetrics();
518        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
519          scanMetrics.collectMetricsByRegion();
520
521        long actualCountOfRowsScannedInRegion = 0;
522        long rpcRetiesCount = 0;
523        Set<String> splitRegionRes = new HashSet<>();
524
525        // 1 entry each for parent and two child regions
526        Assert.assertEquals(3, scanMetricsByRegion.size());
527        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
528          .entrySet()) {
529          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
530          Map<String, Long> metricsMap = entry.getValue();
531          long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
532          actualCountOfRowsScannedInRegion += rowsScanned;
533          splitRegionRes.add(scanMetricsRegionInfo.getEncodedRegionName());
534
535          if (metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1) {
536            rpcRetiesCount++;
537          }
538
539          Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
540        }
541        Assert.assertEquals(expectedCountOfRowsScannedInRegion, actualCountOfRowsScannedInRegion);
542        Assert.assertEquals(2, rpcRetiesCount);
543        Assert.assertEquals(expectedSplitRegionRes, splitRegionRes);
544      }
545    } finally {
546      TEST_UTIL.deleteTable(tableName);
547    }
548  }
549
550  @Test
551  public void testScanMetricsByRegionWithRegionMerge() throws Exception {
552    TableName tableName = TableName.valueOf(
553      TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMerge");
554    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
555      TEST_UTIL.loadTable(table, CF);
556
557      // Scan 2 regions with start row keys: bbb and ccc
558      byte[] bbb = Bytes.toBytes("bbb");
559      byte[] ccc = Bytes.toBytes("ccc");
560      byte[] ddc = Bytes.toBytes("ddc");
561      long expectedCountOfRowsScannedInRegions = 0;
562      // ROWS is the data loaded by loadTable()
563      for (byte[] row : HBaseTestingUtil.ROWS) {
564        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ddc) <= 0) {
565          expectedCountOfRowsScannedInRegions++;
566        }
567      }
568      ScanMetrics scanMetrics;
569      Set<String> expectedMergeRegionsRes = new HashSet<>();
570      String mergedRegionEncodedName = null;
571
572      // Initialize scan
573      Scan scan = generateScan(bbb, ddc);
574      scan.setEnableScanMetricsByRegion(true);
575      scan.setMaxResultSize(8000);
576
577      try (ResultScanner rs = table.getScanner(scan)) {
578        boolean isFirstScanOfRegion = true;
579        for (Result r : rs) {
580          if (isFirstScanOfRegion) {
581            List<byte[]> out = mergeRegions(tableName, bbb, ccc);
582            // Entry with index 2 is the encoded region name of merged region
583            mergedRegionEncodedName = Bytes.toString(out.get(2));
584            out.forEach(region -> expectedMergeRegionsRes.add(Bytes.toString(region)));
585            isFirstScanOfRegion = false;
586          }
587        }
588
589        scanMetrics = rs.getScanMetrics();
590        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
591          scanMetrics.collectMetricsByRegion();
592        long actualCountOfRowsScannedInRegions = 0;
593        Set<String> mergeRegionsRes = new HashSet<>();
594        boolean containsMergedRegionInScanMetrics = false;
595
596        // 1 entry each for old region from which first row was scanned and new merged region
597        Assert.assertEquals(2, scanMetricsByRegion.size());
598        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
599          .entrySet()) {
600          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
601          Map<String, Long> metricsMap = entry.getValue();
602          long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
603          actualCountOfRowsScannedInRegions += rowsScanned;
604          mergeRegionsRes.add(scanMetricsRegionInfo.getEncodedRegionName());
605          if (scanMetricsRegionInfo.getEncodedRegionName().equals(mergedRegionEncodedName)) {
606            containsMergedRegionInScanMetrics = true;
607          }
608
609          Assert.assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME));
610          Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
611        }
612        Assert.assertEquals(expectedCountOfRowsScannedInRegions, actualCountOfRowsScannedInRegions);
613        Assert.assertTrue(expectedMergeRegionsRes.containsAll(mergeRegionsRes));
614        Assert.assertTrue(containsMergedRegionInScanMetrics);
615      }
616    } finally {
617      TEST_UTIL.deleteTable(tableName);
618    }
619  }
620
621  private Runnable getPeriodicScanMetricsCollector(ScanMetrics scanMetrics,
622    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegionCollection,
623    CountDownLatch latch) {
624    return new Runnable() {
625      public void run() {
626        try {
627          while (latch.getCount() > 0) {
628            Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
629              scanMetrics.collectMetricsByRegion();
630            mergeScanMetricsByRegion(scanMetricsByRegion, scanMetricsByRegionCollection);
631            Thread.sleep(RAND.nextInt(10));
632          }
633        } catch (InterruptedException e) {
634          throw new RuntimeException(e);
635        }
636      }
637    };
638  }
639
640  private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String, Long>> srcMap,
641    Map<ScanMetricsRegionInfo, Map<String, Long>> dstMap) {
642    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : srcMap.entrySet()) {
643      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
644      Map<String, Long> metricsMap = entry.getValue();
645      // Remove millis between nexts metric as it is not deterministic
646      metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
647      metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
648      metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
649      if (dstMap.containsKey(scanMetricsRegionInfo)) {
650        Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo);
651        for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) {
652          String metricName = metricEntry.getKey();
653          Long existingValue = dstMetricsMap.get(metricName);
654          Long newValue = metricEntry.getValue();
655          dstMetricsMap.put(metricName, existingValue + newValue);
656        }
657      } else {
658        dstMap.put(scanMetricsRegionInfo, metricsMap);
659      }
660    }
661  }
662
663  /**
664   * Moves the region with start row key from its original region server to some other region
665   * server. This is a synchronous method.
666   * @param tableName Table name of region to be moved belongs.
667   * @param startRow  Start row key of the region to be moved.
668   * @return Encoded region name of the region which was moved.
669   */
670  private byte[] moveRegion(TableName tableName, byte[] startRow) throws IOException {
671    Admin admin = TEST_UTIL.getAdmin();
672    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
673    HRegionLocation loc = regionLocator.getRegionLocation(startRow, true);
674    byte[] encodedRegionName = loc.getRegion().getEncodedNameAsBytes();
675    ServerName initialServerName = loc.getServerName();
676
677    admin.move(encodedRegionName);
678
679    ServerName finalServerName = regionLocator.getRegionLocation(startRow, true).getServerName();
680
681    // Assert that region actually moved
682    Assert.assertNotEquals(initialServerName, finalServerName);
683    return encodedRegionName;
684  }
685
686  /**
687   * Splits the region with start row key at the split key provided. This is a synchronous method.
688   * @param tableName Table name of region to be split.
689   * @param startRow  Start row key of the region to be split.
690   * @param splitKey  Split key for splitting the region.
691   * @return List of encoded region names with first element being parent region followed by two
692   *         child regions.
693   */
694  private List<byte[]> splitRegion(TableName tableName, byte[] startRow, byte[] splitKey)
695    throws IOException {
696    Admin admin = TEST_UTIL.getAdmin();
697    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
698    HRegionLocation topLoc = regionLocator.getRegionLocation(startRow, true);
699    byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
700    ServerName initialTopServerName = topLoc.getServerName();
701    HRegionLocation bottomLoc = regionLocator.getRegionLocation(splitKey, true);
702    byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
703    ServerName initialBottomServerName = bottomLoc.getServerName();
704
705    // Assert region is ready for split
706    Assert.assertEquals(initialTopServerName, initialBottomServerName);
707    Assert.assertEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName);
708
709    FutureUtils.get(admin.splitRegionAsync(initialEncodedTopRegionName, splitKey));
710
711    topLoc = regionLocator.getRegionLocation(startRow, true);
712    byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
713    bottomLoc = regionLocator.getRegionLocation(splitKey, true);
714    byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
715
716    // Assert that region split is complete
717    Assert.assertNotEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName);
718    Assert.assertNotEquals(initialEncodedTopRegionName, finalEncodedBottomRegionName);
719    Assert.assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName);
720
721    return Arrays.asList(initialEncodedTopRegionName, finalEncodedTopRegionName,
722      finalEncodedBottomRegionName);
723  }
724
725  /**
726   * Merges two regions with the start row key as topRegion and bottomRegion. Ensures that the
727   * regions to be merged are adjacent regions. This is a synchronous method.
728   * @param tableName    Table name of regions to be merged.
729   * @param topRegion    Start row key of first region for merging.
730   * @param bottomRegion Start row key of second region for merging.
731   * @return List of encoded region names with first two elements being original regions followed by
732   *         the merged region.
733   */
734  private List<byte[]> mergeRegions(TableName tableName, byte[] topRegion, byte[] bottomRegion)
735    throws IOException {
736    Admin admin = TEST_UTIL.getAdmin();
737    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
738    HRegionLocation topLoc = regionLocator.getRegionLocation(topRegion, true);
739    byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
740    String initialTopRegionEndKey = Bytes.toString(topLoc.getRegion().getEndKey());
741    HRegionLocation bottomLoc = regionLocator.getRegionLocation(bottomRegion, true);
742    byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
743    String initialBottomRegionStartKey = Bytes.toString(bottomLoc.getRegion().getStartKey());
744
745    // Assert that regions are ready to be merged
746    Assert.assertNotEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName);
747    Assert.assertEquals(initialBottomRegionStartKey, initialTopRegionEndKey);
748
749    FutureUtils.get(admin.mergeRegionsAsync(
750      new byte[][] { initialEncodedTopRegionName, initialEncodedBottomRegionName }, false));
751
752    topLoc = regionLocator.getRegionLocation(topRegion, true);
753    byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes();
754    bottomLoc = regionLocator.getRegionLocation(bottomRegion, true);
755    byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes();
756
757    // Assert regions have been merges successfully
758    Assert.assertEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName);
759    Assert.assertNotEquals(initialEncodedTopRegionName, finalEncodedTopRegionName);
760    Assert.assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName);
761
762    return Arrays.asList(initialEncodedTopRegionName, initialEncodedBottomRegionName,
763      finalEncodedTopRegionName);
764  }
765}