001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; 021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; 022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME; 023import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; 024import static org.junit.Assert.assertEquals; 025 026import java.io.IOException; 027import java.util.Arrays; 028import java.util.HashMap; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.Set; 034import java.util.concurrent.CountDownLatch; 035import java.util.concurrent.Executors; 036import java.util.concurrent.ThreadPoolExecutor; 037import java.util.concurrent.atomic.AtomicInteger; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtil; 040import org.apache.hadoop.hbase.HRegionLocation; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 044import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; 045import org.apache.hadoop.hbase.testclassification.ClientTests; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.FutureUtils; 049import org.junit.AfterClass; 050import org.junit.Assert; 051import org.junit.BeforeClass; 052import org.junit.ClassRule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.runners.Parameterized.Parameter; 056import org.junit.runners.Parameterized.Parameters; 057 058@Category({ ClientTests.class, MediumTests.class }) 059public class TestTableScanMetrics extends FromClientSideBase { 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestTableScanMetrics.class); 063 064 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 065 066 private static final TableName TABLE_NAME = 067 TableName.valueOf(TestTableScanMetrics.class.getSimpleName()); 068 069 private static final byte[] CF = Bytes.toBytes("cf"); 070 071 private static final byte[] CQ = Bytes.toBytes("cq"); 072 073 private static final byte[] VALUE = Bytes.toBytes("value"); 074 075 private static final Random RAND = new Random(11); 076 077 private static int NUM_REGIONS; 078 079 private static Connection CONN; 080 081 @Parameters(name = "{index}: scanner={0}") 082 public static List<Object[]> params() { 083 return Arrays.asList(new Object[] { "ForwardScanner", new Scan() }, 084 new Object[] { "ReverseScanner", new Scan().setReversed(true) }); 085 } 086 087 @Parameter(0) 088 public String scannerName; 089 090 @Parameter(1) 091 public Scan originalScan; 092 093 @BeforeClass 094 public static void setUp() throws Exception { 095 // Start the minicluster 096 TEST_UTIL.startMiniCluster(2); 097 // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that 098 // scan hits all the region and not all rows lie in a single region 099 try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) { 100 table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE), 101 new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE), 102 new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE))); 103 } 104 CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 105 NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size(); 106 } 107 108 @AfterClass 109 public static void tearDown() throws Exception { 110 TEST_UTIL.shutdownMiniCluster(); 111 } 112 113 private Scan generateScan(byte[] smallerRow, byte[] largerRow) throws IOException { 114 Scan scan = new Scan(originalScan); 115 if (originalScan.isReversed()) { 116 scan.withStartRow(largerRow, true); 117 scan.withStopRow(smallerRow, true); 118 } else { 119 scan.withStartRow(smallerRow, true); 120 scan.withStopRow(largerRow, true); 121 } 122 return scan; 123 } 124 125 private ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int expectedCount) 126 throws IOException { 127 int countOfRows = 0; 128 ScanMetrics scanMetrics; 129 try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = table.getScanner(scan)) { 130 for (Result result : scanner) { 131 Assert.assertFalse(result.isEmpty()); 132 countOfRows++; 133 } 134 scanMetrics = scanner.getScanMetrics(); 135 } 136 Assert.assertEquals(expectedCount, countOfRows); 137 return scanMetrics; 138 } 139 140 @Test 141 public void testScanMetricsDisabled() throws Exception { 142 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 143 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3); 144 Assert.assertNull(scanMetrics); 145 } 146 147 @Test 148 public void testScanMetricsWithScanMetricByRegionDisabled() throws Exception { 149 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 150 scan.setScanMetricsEnabled(true); 151 int expectedRowsScanned = 3; 152 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 153 Assert.assertNotNull(scanMetrics); 154 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false); 155 // The test setup is such that we have 1 row per region in the scan range 156 Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRegions.get()); 157 Assert.assertEquals(expectedRowsScanned, 158 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 159 Assert.assertTrue(scanMetrics.collectMetricsByRegion().isEmpty()); 160 } 161 162 @Test 163 public void testScanMetricsResetWithScanMetricsByRegionDisabled() throws Exception { 164 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 165 scan.setScanMetricsEnabled(true); 166 int expectedRowsScanned = 3; 167 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 168 Assert.assertNotNull(scanMetrics); 169 // By default counters are collected with reset as true 170 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(); 171 Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 172 Assert.assertEquals(expectedRowsScanned, 173 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 174 // Subsequent call to get scan metrics map should show all counters as 0 175 Assert.assertEquals(0, scanMetrics.countOfRegions.get()); 176 Assert.assertEquals(0, scanMetrics.countOfRowsScanned.get()); 177 } 178 179 @Test 180 public void testScanMetricsByRegionForSingleRegionScan() throws Exception { 181 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("xxx1")); 182 scan.setEnableScanMetricsByRegion(true); 183 int expectedRowsScanned = 1; 184 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 185 Assert.assertNotNull(scanMetrics); 186 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false); 187 Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 188 Assert.assertEquals(expectedRowsScanned, 189 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 190 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 191 scanMetrics.collectMetricsByRegion(false); 192 Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size()); 193 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 194 .entrySet()) { 195 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 196 metricsMap = entry.getValue(); 197 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 198 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 199 // As we are scanning single row so, overall scan metrics will match per region scan metrics 200 Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 201 Assert.assertEquals(expectedRowsScanned, 202 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 203 } 204 } 205 206 @Test 207 public void testScanMetricsByRegionForMultiRegionScan() throws Exception { 208 Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 209 scan.setEnableScanMetricsByRegion(true); 210 int expectedRowsScanned = 3; 211 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 212 Assert.assertNotNull(scanMetrics); 213 Assert.assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get()); 214 Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRowsScanned.get()); 215 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 216 scanMetrics.collectMetricsByRegion(false); 217 Assert.assertEquals(NUM_REGIONS, scanMetricsByRegion.size()); 218 int rowsScannedAcrossAllRegions = 0; 219 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 220 .entrySet()) { 221 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 222 Map<String, Long> metricsMap = entry.getValue(); 223 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 224 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 225 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 226 if (metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) { 227 rowsScannedAcrossAllRegions++; 228 } else { 229 assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 230 } 231 } 232 Assert.assertEquals(expectedRowsScanned, rowsScannedAcrossAllRegions); 233 } 234 235 @Test 236 public void testScanMetricsByRegionReset() throws Exception { 237 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 238 scan.setEnableScanMetricsByRegion(true); 239 int expectedRowsScanned = 3; 240 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 241 Assert.assertNotNull(scanMetrics); 242 243 // Retrieve scan metrics by region as a map and reset 244 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 245 scanMetrics.collectMetricsByRegion(); 246 // We scan 1 row per region 247 Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size()); 248 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 249 .entrySet()) { 250 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 251 Map<String, Long> metricsMap = entry.getValue(); 252 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 253 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 254 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 255 Assert.assertEquals(1, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 256 } 257 258 // Scan metrics have already been reset and now all counters should be 0 259 scanMetricsByRegion = scanMetrics.collectMetricsByRegion(false); 260 // Size of map should be same as earlier 261 Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size()); 262 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 263 .entrySet()) { 264 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 265 Map<String, Long> metricsMap = entry.getValue(); 266 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 267 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 268 // Counters should have been reset to 0 269 Assert.assertEquals(0, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 270 Assert.assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 271 } 272 } 273 274 @Test 275 public void testConcurrentUpdatesAndResetOfScanMetricsByRegion() throws Exception { 276 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); 277 TableName tableName = TableName.valueOf(TestTableScanMetrics.class.getSimpleName() 278 + "_testConcurrentUpdatesAndResetToScanMetricsByRegion"); 279 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 280 TEST_UTIL.loadTable(table, CF); 281 282 Map<ScanMetricsRegionInfo, Map<String, Long>> concurrentScanMetricsByRegion = new HashMap<>(); 283 284 // Trigger two concurrent threads one of which scans the table and other periodically 285 // collects the scan metrics (along with resetting the counters to 0). 286 Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 287 scan.setEnableScanMetricsByRegion(true); 288 scan.setCaching(2); 289 try (ResultScanner rs = table.getScanner(scan)) { 290 ScanMetrics scanMetrics = rs.getScanMetrics(); 291 AtomicInteger rowsScanned = new AtomicInteger(0); 292 CountDownLatch latch = new CountDownLatch(1); 293 Runnable tableScanner = new Runnable() { 294 public void run() { 295 for (Result r : rs) { 296 Assert.assertFalse(r.isEmpty()); 297 rowsScanned.incrementAndGet(); 298 } 299 latch.countDown(); 300 } 301 }; 302 Runnable metricsCollector = 303 getPeriodicScanMetricsCollector(scanMetrics, concurrentScanMetricsByRegion, latch); 304 executor.execute(tableScanner); 305 executor.execute(metricsCollector); 306 latch.await(); 307 // Merge leftover scan metrics 308 mergeScanMetricsByRegion(scanMetrics.collectMetricsByRegion(), 309 concurrentScanMetricsByRegion); 310 Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned.get()); 311 } 312 313 Map<ScanMetricsRegionInfo, Map<String, Long>> expectedScanMetricsByRegion; 314 315 // Collect scan metrics by region from single thread. Assert that concurrent scan 316 // and metrics collection works as expected. 317 scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 318 scan.setEnableScanMetricsByRegion(true); 319 scan.setCaching(2); 320 try (ResultScanner rs = table.getScanner(scan)) { 321 ScanMetrics scanMetrics = rs.getScanMetrics(); 322 int rowsScanned = 0; 323 for (Result r : rs) { 324 Assert.assertFalse(r.isEmpty()); 325 rowsScanned++; 326 } 327 Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned); 328 expectedScanMetricsByRegion = scanMetrics.collectMetricsByRegion(); 329 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : expectedScanMetricsByRegion 330 .entrySet()) { 331 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 332 Map<String, Long> metricsMap = entry.getValue(); 333 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 334 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 335 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 336 // Each region will have 26 * 26 + 26 + 1 rows except last region which will have 1 row 337 long rowsScannedFromMetrics = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 338 Assert.assertTrue( 339 rowsScannedFromMetrics == 1 || rowsScannedFromMetrics == (26 * 26 + 26 + 1)); 340 } 341 } 342 343 // Assert on scan metrics by region 344 Assert.assertEquals(expectedScanMetricsByRegion, concurrentScanMetricsByRegion); 345 } finally { 346 TEST_UTIL.deleteTable(tableName); 347 } 348 } 349 350 @Test 351 public void testScanMetricsByRegionWithRegionMove() throws Exception { 352 TableName tableName = TableName.valueOf( 353 TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMove"); 354 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 355 TEST_UTIL.loadTable(table, CF); 356 357 // Scan 2 regions with start row keys: bbb and ccc 358 byte[] bbb = Bytes.toBytes("bbb"); 359 byte[] ccc = Bytes.toBytes("ccc"); 360 byte[] ddc = Bytes.toBytes("ddc"); 361 long expectedCountOfRowsScannedInMovedRegion = 0; 362 // ROWS is the data loaded by loadTable() 363 for (byte[] row : HBaseTestingUtil.ROWS) { 364 if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccc) < 0) { 365 expectedCountOfRowsScannedInMovedRegion++; 366 } 367 } 368 byte[] movedRegion = null; 369 ScanMetrics scanMetrics; 370 371 // Initialize scan with maxResultSize as size of 50 rows. 372 Scan scan = generateScan(bbb, ddc); 373 scan.setEnableScanMetricsByRegion(true); 374 scan.setMaxResultSize(8000); 375 376 try (ResultScanner rs = table.getScanner(scan)) { 377 boolean isFirstScanOfRegion = true; 378 for (Result r : rs) { 379 byte[] row = r.getRow(); 380 if (isFirstScanOfRegion) { 381 movedRegion = moveRegion(tableName, row); 382 isFirstScanOfRegion = false; 383 } 384 } 385 Assert.assertNotNull(movedRegion); 386 387 scanMetrics = rs.getScanMetrics(); 388 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 389 scanMetrics.collectMetricsByRegion(); 390 long actualCountOfRowsScannedInMovedRegion = 0; 391 Set<ServerName> serversForMovedRegion = new HashSet<>(); 392 393 // 2 regions scanned with two entries for first region as it moved in b/w scan 394 Assert.assertEquals(3, scanMetricsByRegion.size()); 395 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 396 .entrySet()) { 397 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 398 Map<String, Long> metricsMap = entry.getValue(); 399 if (scanMetricsRegionInfo.getEncodedRegionName().equals(Bytes.toString(movedRegion))) { 400 long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 401 actualCountOfRowsScannedInMovedRegion += rowsScanned; 402 serversForMovedRegion.add(scanMetricsRegionInfo.getServerName()); 403 404 Assert.assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME)); 405 } 406 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 407 } 408 Assert.assertEquals(expectedCountOfRowsScannedInMovedRegion, 409 actualCountOfRowsScannedInMovedRegion); 410 Assert.assertEquals(2, serversForMovedRegion.size()); 411 } 412 } finally { 413 TEST_UTIL.deleteTable(tableName); 414 } 415 } 416 417 @Test 418 public void testScanMetricsByRegionWithRegionSplit() throws Exception { 419 TableName tableName = TableName.valueOf( 420 TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionSplit"); 421 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 422 TEST_UTIL.loadTable(table, CF); 423 424 // Scan 1 region with start row key: bbb 425 byte[] bbb = Bytes.toBytes("bbb"); 426 byte[] bmw = Bytes.toBytes("bmw"); 427 byte[] ccb = Bytes.toBytes("ccb"); 428 long expectedCountOfRowsScannedInRegion = 0; 429 // ROWS is the data loaded by loadTable() 430 for (byte[] row : HBaseTestingUtil.ROWS) { 431 if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccb) <= 0) { 432 expectedCountOfRowsScannedInRegion++; 433 } 434 } 435 ScanMetrics scanMetrics; 436 Set<String> expectedSplitRegionRes = new HashSet<>(); 437 438 // Initialize scan 439 Scan scan = generateScan(bbb, ccb); 440 scan.setEnableScanMetricsByRegion(true); 441 scan.setMaxResultSize(8000); 442 443 try (ResultScanner rs = table.getScanner(scan)) { 444 boolean isFirstScanOfRegion = true; 445 for (Result r : rs) { 446 if (isFirstScanOfRegion) { 447 splitRegion(tableName, bbb, bmw) 448 .forEach(region -> expectedSplitRegionRes.add(Bytes.toString(region))); 449 isFirstScanOfRegion = false; 450 } 451 } 452 453 scanMetrics = rs.getScanMetrics(); 454 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 455 scanMetrics.collectMetricsByRegion(); 456 457 long actualCountOfRowsScannedInRegion = 0; 458 long rpcRetiesCount = 0; 459 Set<String> splitRegionRes = new HashSet<>(); 460 461 // 1 entry each for parent and two child regions 462 Assert.assertEquals(3, scanMetricsByRegion.size()); 463 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 464 .entrySet()) { 465 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 466 Map<String, Long> metricsMap = entry.getValue(); 467 long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 468 actualCountOfRowsScannedInRegion += rowsScanned; 469 splitRegionRes.add(scanMetricsRegionInfo.getEncodedRegionName()); 470 471 if (metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1) { 472 rpcRetiesCount++; 473 } 474 475 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 476 } 477 Assert.assertEquals(expectedCountOfRowsScannedInRegion, actualCountOfRowsScannedInRegion); 478 Assert.assertEquals(2, rpcRetiesCount); 479 Assert.assertEquals(expectedSplitRegionRes, splitRegionRes); 480 } 481 } finally { 482 TEST_UTIL.deleteTable(tableName); 483 } 484 } 485 486 @Test 487 public void testScanMetricsByRegionWithRegionMerge() throws Exception { 488 TableName tableName = TableName.valueOf( 489 TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMerge"); 490 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 491 TEST_UTIL.loadTable(table, CF); 492 493 // Scan 2 regions with start row keys: bbb and ccc 494 byte[] bbb = Bytes.toBytes("bbb"); 495 byte[] ccc = Bytes.toBytes("ccc"); 496 byte[] ddc = Bytes.toBytes("ddc"); 497 long expectedCountOfRowsScannedInRegions = 0; 498 // ROWS is the data loaded by loadTable() 499 for (byte[] row : HBaseTestingUtil.ROWS) { 500 if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ddc) <= 0) { 501 expectedCountOfRowsScannedInRegions++; 502 } 503 } 504 ScanMetrics scanMetrics; 505 Set<String> expectedMergeRegionsRes = new HashSet<>(); 506 String mergedRegionEncodedName = null; 507 508 // Initialize scan 509 Scan scan = generateScan(bbb, ddc); 510 scan.setEnableScanMetricsByRegion(true); 511 scan.setMaxResultSize(8000); 512 513 try (ResultScanner rs = table.getScanner(scan)) { 514 boolean isFirstScanOfRegion = true; 515 for (Result r : rs) { 516 if (isFirstScanOfRegion) { 517 List<byte[]> out = mergeRegions(tableName, bbb, ccc); 518 // Entry with index 2 is the encoded region name of merged region 519 mergedRegionEncodedName = Bytes.toString(out.get(2)); 520 out.forEach(region -> expectedMergeRegionsRes.add(Bytes.toString(region))); 521 isFirstScanOfRegion = false; 522 } 523 } 524 525 scanMetrics = rs.getScanMetrics(); 526 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 527 scanMetrics.collectMetricsByRegion(); 528 long actualCountOfRowsScannedInRegions = 0; 529 Set<String> mergeRegionsRes = new HashSet<>(); 530 boolean containsMergedRegionInScanMetrics = false; 531 532 // 1 entry each for old region from which first row was scanned and new merged region 533 Assert.assertEquals(2, scanMetricsByRegion.size()); 534 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 535 .entrySet()) { 536 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 537 Map<String, Long> metricsMap = entry.getValue(); 538 long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 539 actualCountOfRowsScannedInRegions += rowsScanned; 540 mergeRegionsRes.add(scanMetricsRegionInfo.getEncodedRegionName()); 541 if (scanMetricsRegionInfo.getEncodedRegionName().equals(mergedRegionEncodedName)) { 542 containsMergedRegionInScanMetrics = true; 543 } 544 545 Assert.assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME)); 546 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 547 } 548 Assert.assertEquals(expectedCountOfRowsScannedInRegions, actualCountOfRowsScannedInRegions); 549 Assert.assertTrue(expectedMergeRegionsRes.containsAll(mergeRegionsRes)); 550 Assert.assertTrue(containsMergedRegionInScanMetrics); 551 } 552 } finally { 553 TEST_UTIL.deleteTable(tableName); 554 } 555 } 556 557 private Runnable getPeriodicScanMetricsCollector(ScanMetrics scanMetrics, 558 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegionCollection, 559 CountDownLatch latch) { 560 return new Runnable() { 561 public void run() { 562 try { 563 while (latch.getCount() > 0) { 564 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 565 scanMetrics.collectMetricsByRegion(); 566 mergeScanMetricsByRegion(scanMetricsByRegion, scanMetricsByRegionCollection); 567 Thread.sleep(RAND.nextInt(10)); 568 } 569 } catch (InterruptedException e) { 570 throw new RuntimeException(e); 571 } 572 } 573 }; 574 } 575 576 private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String, Long>> srcMap, 577 Map<ScanMetricsRegionInfo, Map<String, Long>> dstMap) { 578 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : srcMap.entrySet()) { 579 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 580 Map<String, Long> metricsMap = entry.getValue(); 581 if (dstMap.containsKey(scanMetricsRegionInfo)) { 582 Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo); 583 for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) { 584 String metricName = metricEntry.getKey(); 585 Long existingValue = dstMetricsMap.get(metricName); 586 Long newValue = metricEntry.getValue(); 587 dstMetricsMap.put(metricName, existingValue + newValue); 588 } 589 } else { 590 dstMap.put(scanMetricsRegionInfo, metricsMap); 591 } 592 } 593 } 594 595 /** 596 * Moves the region with start row key from its original region server to some other region 597 * server. This is a synchronous method. 598 * @param tableName Table name of region to be moved belongs. 599 * @param startRow Start row key of the region to be moved. 600 * @return Encoded region name of the region which was moved. 601 */ 602 private byte[] moveRegion(TableName tableName, byte[] startRow) throws IOException { 603 Admin admin = TEST_UTIL.getAdmin(); 604 RegionLocator regionLocator = CONN.getRegionLocator(tableName); 605 HRegionLocation loc = regionLocator.getRegionLocation(startRow, true); 606 byte[] encodedRegionName = loc.getRegion().getEncodedNameAsBytes(); 607 ServerName initialServerName = loc.getServerName(); 608 609 admin.move(encodedRegionName); 610 611 ServerName finalServerName = regionLocator.getRegionLocation(startRow, true).getServerName(); 612 613 // Assert that region actually moved 614 Assert.assertNotEquals(initialServerName, finalServerName); 615 return encodedRegionName; 616 } 617 618 /** 619 * Splits the region with start row key at the split key provided. This is a synchronous method. 620 * @param tableName Table name of region to be split. 621 * @param startRow Start row key of the region to be split. 622 * @param splitKey Split key for splitting the region. 623 * @return List of encoded region names with first element being parent region followed by two 624 * child regions. 625 */ 626 private List<byte[]> splitRegion(TableName tableName, byte[] startRow, byte[] splitKey) 627 throws IOException { 628 Admin admin = TEST_UTIL.getAdmin(); 629 RegionLocator regionLocator = CONN.getRegionLocator(tableName); 630 HRegionLocation topLoc = regionLocator.getRegionLocation(startRow, true); 631 byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 632 ServerName initialTopServerName = topLoc.getServerName(); 633 HRegionLocation bottomLoc = regionLocator.getRegionLocation(splitKey, true); 634 byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 635 ServerName initialBottomServerName = bottomLoc.getServerName(); 636 637 // Assert region is ready for split 638 Assert.assertEquals(initialTopServerName, initialBottomServerName); 639 Assert.assertEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName); 640 641 FutureUtils.get(admin.splitRegionAsync(initialEncodedTopRegionName, splitKey)); 642 643 topLoc = regionLocator.getRegionLocation(startRow, true); 644 byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 645 bottomLoc = regionLocator.getRegionLocation(splitKey, true); 646 byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 647 648 // Assert that region split is complete 649 Assert.assertNotEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName); 650 Assert.assertNotEquals(initialEncodedTopRegionName, finalEncodedBottomRegionName); 651 Assert.assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName); 652 653 return Arrays.asList(initialEncodedTopRegionName, finalEncodedTopRegionName, 654 finalEncodedBottomRegionName); 655 } 656 657 /** 658 * Merges two regions with the start row key as topRegion and bottomRegion. Ensures that the 659 * regions to be merged are adjacent regions. This is a synchronous method. 660 * @param tableName Table name of regions to be merged. 661 * @param topRegion Start row key of first region for merging. 662 * @param bottomRegion Start row key of second region for merging. 663 * @return List of encoded region names with first two elements being original regions followed by 664 * the merged region. 665 */ 666 private List<byte[]> mergeRegions(TableName tableName, byte[] topRegion, byte[] bottomRegion) 667 throws IOException { 668 Admin admin = TEST_UTIL.getAdmin(); 669 RegionLocator regionLocator = CONN.getRegionLocator(tableName); 670 HRegionLocation topLoc = regionLocator.getRegionLocation(topRegion, true); 671 byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 672 String initialTopRegionEndKey = Bytes.toString(topLoc.getRegion().getEndKey()); 673 HRegionLocation bottomLoc = regionLocator.getRegionLocation(bottomRegion, true); 674 byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 675 String initialBottomRegionStartKey = Bytes.toString(bottomLoc.getRegion().getStartKey()); 676 677 // Assert that regions are ready to be merged 678 Assert.assertNotEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName); 679 Assert.assertEquals(initialBottomRegionStartKey, initialTopRegionEndKey); 680 681 FutureUtils.get(admin.mergeRegionsAsync( 682 new byte[][] { initialEncodedTopRegionName, initialEncodedBottomRegionName }, false)); 683 684 topLoc = regionLocator.getRegionLocation(topRegion, true); 685 byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 686 bottomLoc = regionLocator.getRegionLocation(bottomRegion, true); 687 byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 688 689 // Assert regions have been merges successfully 690 Assert.assertEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName); 691 Assert.assertNotEquals(initialEncodedTopRegionName, finalEncodedTopRegionName); 692 Assert.assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName); 693 694 return Arrays.asList(initialEncodedTopRegionName, initialEncodedBottomRegionName, 695 finalEncodedTopRegionName); 696 } 697}