001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; 021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME; 022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; 023import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME; 024import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; 025import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME; 026import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME; 027import static org.junit.jupiter.api.Assertions.assertEquals; 028import static org.junit.jupiter.api.Assertions.assertFalse; 029import static org.junit.jupiter.api.Assertions.assertNotEquals; 030import static org.junit.jupiter.api.Assertions.assertNotNull; 031import static org.junit.jupiter.api.Assertions.assertNull; 032import static org.junit.jupiter.api.Assertions.assertTrue; 033 034import java.io.IOException; 035import java.util.Arrays; 036import java.util.HashMap; 037import java.util.HashSet; 038import java.util.List; 039import java.util.Map; 040import java.util.Random; 041import java.util.Set; 042import java.util.concurrent.CountDownLatch; 043import java.util.concurrent.Executors; 044import java.util.concurrent.ThreadPoolExecutor; 045import java.util.concurrent.TimeUnit; 046import java.util.concurrent.atomic.AtomicInteger; 047import java.util.concurrent.atomic.AtomicLong; 048import java.util.stream.Stream; 049import org.apache.hadoop.conf.Configuration; 050import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 051import org.apache.hadoop.hbase.HBaseTestingUtil; 052import org.apache.hadoop.hbase.HConstants; 053import org.apache.hadoop.hbase.HRegionLocation; 054import org.apache.hadoop.hbase.ServerName; 055import org.apache.hadoop.hbase.TableName; 056import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 057import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; 058import org.apache.hadoop.hbase.testclassification.ClientTests; 059import org.apache.hadoop.hbase.testclassification.LargeTests; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.FutureUtils; 062import org.junit.jupiter.api.AfterAll; 063import org.junit.jupiter.api.BeforeAll; 064import org.junit.jupiter.api.Tag; 065import org.junit.jupiter.api.TestTemplate; 066import org.junit.jupiter.params.provider.Arguments; 067 068import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 069 070@Tag(ClientTests.TAG) 071@Tag(LargeTests.TAG) 072@HBaseParameterizedTestTemplate(name = "{index}: scanner={0}") 073public class TestTableScanMetrics { 074 075 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 076 077 private static final TableName TABLE_NAME = 078 TableName.valueOf(TestTableScanMetrics.class.getSimpleName()); 079 080 private static final byte[] CF = Bytes.toBytes("cf"); 081 082 private static final byte[] CQ = Bytes.toBytes("cq"); 083 084 private static final byte[] VALUE = Bytes.toBytes("value"); 085 086 private static final Random RAND = new Random(11); 087 088 private static int NUM_REGIONS; 089 090 private static Connection CONN; 091 092 public static Stream<Arguments> parameters() { 093 return Stream.of(Arguments.of("ForwardScanner", new Scan()), 094 Arguments.of("ReverseScanner", new Scan().setReversed(true))); 095 } 096 097 private Scan originalScan; 098 099 public TestTableScanMetrics(String scannerName, Scan originalScan) { 100 this.originalScan = originalScan; 101 } 102 103 @BeforeAll 104 public static void setUp() throws Exception { 105 // Start the minicluster 106 TEST_UTIL.startMiniCluster(2); 107 // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that 108 // scan hits all the region and not all rows lie in a single region 109 try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) { 110 table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE), 111 new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE), 112 new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE))); 113 } 114 CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 115 NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size(); 116 } 117 118 @AfterAll 119 public static void tearDown() throws Exception { 120 Closeables.close(CONN, true); 121 TEST_UTIL.shutdownMiniCluster(); 122 } 123 124 private Scan generateScan(byte[] smallerRow, byte[] largerRow) throws IOException { 125 Scan scan = new Scan(originalScan); 126 if (originalScan.isReversed()) { 127 scan.withStartRow(largerRow, true); 128 scan.withStopRow(smallerRow, true); 129 } else { 130 scan.withStartRow(smallerRow, true); 131 scan.withStopRow(largerRow, true); 132 } 133 return scan; 134 } 135 136 private ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int expectedCount) 137 throws IOException { 138 int countOfRows = 0; 139 ScanMetrics scanMetrics; 140 try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = table.getScanner(scan)) { 141 for (Result result : scanner) { 142 assertFalse(result.isEmpty()); 143 countOfRows++; 144 } 145 scanMetrics = scanner.getScanMetrics(); 146 } 147 assertEquals(expectedCount, countOfRows); 148 return scanMetrics; 149 } 150 151 @TestTemplate 152 public void testScanMetricsDisabled() throws Exception { 153 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 154 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3); 155 assertNull(scanMetrics); 156 } 157 158 @TestTemplate 159 public void testScanMetricsWithScanMetricByRegionDisabled() throws Exception { 160 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 161 scan.setScanMetricsEnabled(true); 162 int expectedRowsScanned = 3; 163 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 164 assertNotNull(scanMetrics); 165 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false); 166 // The test setup is such that we have 1 row per region in the scan range 167 assertEquals(expectedRowsScanned, scanMetrics.countOfRegions.get()); 168 assertEquals(expectedRowsScanned, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 169 assertTrue(scanMetrics.collectMetricsByRegion().isEmpty()); 170 } 171 172 @TestTemplate 173 public void testScanMetricsResetWithScanMetricsByRegionDisabled() throws Exception { 174 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 175 scan.setScanMetricsEnabled(true); 176 int expectedRowsScanned = 3; 177 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 178 assertNotNull(scanMetrics); 179 // By default counters are collected with reset as true 180 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(); 181 assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 182 assertEquals(expectedRowsScanned, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 183 // Subsequent call to get scan metrics map should show all counters as 0 184 assertEquals(0, scanMetrics.countOfRegions.get()); 185 assertEquals(0, scanMetrics.countOfRowsScanned.get()); 186 } 187 188 @TestTemplate 189 public void testScanMetricsByRegionForSingleRegionScan() throws Exception { 190 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("xxx1")); 191 scan.setEnableScanMetricsByRegion(true); 192 int expectedRowsScanned = 1; 193 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 194 assertNotNull(scanMetrics); 195 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false); 196 assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 197 assertEquals(expectedRowsScanned, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 198 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 199 scanMetrics.collectMetricsByRegion(false); 200 assertEquals(expectedRowsScanned, scanMetricsByRegion.size()); 201 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 202 .entrySet()) { 203 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 204 metricsMap = entry.getValue(); 205 assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 206 assertNotNull(scanMetricsRegionInfo.getServerName()); 207 // As we are scanning single row so, overall scan metrics will match per region scan metrics 208 assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 209 assertEquals(expectedRowsScanned, 210 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 211 } 212 } 213 214 @TestTemplate 215 public void testScanMetricsByRegionForMultiRegionScan() throws Exception { 216 Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 217 scan.setEnableScanMetricsByRegion(true); 218 int expectedRowsScanned = 3; 219 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 220 assertNotNull(scanMetrics); 221 assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get()); 222 assertEquals(expectedRowsScanned, scanMetrics.countOfRowsScanned.get()); 223 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 224 scanMetrics.collectMetricsByRegion(false); 225 assertEquals(NUM_REGIONS, scanMetricsByRegion.size()); 226 int rowsScannedAcrossAllRegions = 0; 227 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 228 .entrySet()) { 229 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 230 Map<String, Long> metricsMap = entry.getValue(); 231 assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 232 assertNotNull(scanMetricsRegionInfo.getServerName()); 233 assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 234 if (metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) { 235 rowsScannedAcrossAllRegions++; 236 } else { 237 assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 238 } 239 } 240 assertEquals(expectedRowsScanned, rowsScannedAcrossAllRegions); 241 } 242 243 @TestTemplate 244 public void testScanMetricsByRegionReset() throws Exception { 245 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 246 scan.setEnableScanMetricsByRegion(true); 247 int expectedRowsScanned = 3; 248 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 249 assertNotNull(scanMetrics); 250 251 // Retrieve scan metrics by region as a map and reset 252 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 253 scanMetrics.collectMetricsByRegion(); 254 // We scan 1 row per region 255 assertEquals(expectedRowsScanned, scanMetricsByRegion.size()); 256 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 257 .entrySet()) { 258 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 259 Map<String, Long> metricsMap = entry.getValue(); 260 assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 261 assertNotNull(scanMetricsRegionInfo.getServerName()); 262 assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 263 assertEquals(1, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 264 } 265 266 // Scan metrics have already been reset and now all counters should be 0 267 scanMetricsByRegion = scanMetrics.collectMetricsByRegion(false); 268 // Size of map should be same as earlier 269 assertEquals(expectedRowsScanned, scanMetricsByRegion.size()); 270 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 271 .entrySet()) { 272 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 273 Map<String, Long> metricsMap = entry.getValue(); 274 assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 275 assertNotNull(scanMetricsRegionInfo.getServerName()); 276 // Counters should have been reset to 0 277 assertEquals(0, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 278 assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 279 } 280 } 281 282 @TestTemplate 283 public void testConcurrentUpdatesAndResetOfScanMetricsByRegion() throws Exception { 284 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); 285 TableName tableName = TableName.valueOf(TestTableScanMetrics.class.getSimpleName() 286 + "_testConcurrentUpdatesAndResetToScanMetricsByRegion"); 287 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 288 TEST_UTIL.loadTable(table, CF); 289 290 Map<ScanMetricsRegionInfo, Map<String, Long>> concurrentScanMetricsByRegion = new HashMap<>(); 291 292 // Trigger two concurrent threads one of which scans the table and other periodically 293 // collects the scan metrics (along with resetting the counters to 0). 294 Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 295 scan.setEnableScanMetricsByRegion(true); 296 scan.setCaching(2); 297 try (ResultScanner rs = table.getScanner(scan)) { 298 ScanMetrics scanMetrics = rs.getScanMetrics(); 299 AtomicInteger rowsScanned = new AtomicInteger(0); 300 CountDownLatch latch = new CountDownLatch(1); 301 Runnable tableScanner = new Runnable() { 302 public void run() { 303 for (Result r : rs) { 304 assertFalse(r.isEmpty()); 305 rowsScanned.incrementAndGet(); 306 } 307 latch.countDown(); 308 } 309 }; 310 Runnable metricsCollector = 311 getPeriodicScanMetricsCollector(scanMetrics, concurrentScanMetricsByRegion, latch); 312 executor.execute(tableScanner); 313 executor.execute(metricsCollector); 314 latch.await(); 315 // Merge leftover scan metrics 316 mergeScanMetricsByRegion(scanMetrics.collectMetricsByRegion(), 317 concurrentScanMetricsByRegion); 318 assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned.get()); 319 } 320 321 Map<ScanMetricsRegionInfo, Map<String, Long>> expectedScanMetricsByRegion; 322 323 // Collect scan metrics by region from single thread. Assert that concurrent scan 324 // and metrics collection works as expected. 325 scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 326 scan.setEnableScanMetricsByRegion(true); 327 scan.setCaching(2); 328 try (ResultScanner rs = table.getScanner(scan)) { 329 ScanMetrics scanMetrics = rs.getScanMetrics(); 330 int rowsScanned = 0; 331 for (Result r : rs) { 332 assertFalse(r.isEmpty()); 333 rowsScanned++; 334 } 335 assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned); 336 expectedScanMetricsByRegion = scanMetrics.collectMetricsByRegion(); 337 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : expectedScanMetricsByRegion 338 .entrySet()) { 339 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 340 Map<String, Long> metricsMap = entry.getValue(); 341 // Remove millis between nexts metric as it is not deterministic 342 metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME); 343 metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME); 344 metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); 345 assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 346 assertNotNull(scanMetricsRegionInfo.getServerName()); 347 assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 348 // Each region will have 26 * 26 + 26 + 1 rows except last region which will have 1 row 349 long rowsScannedFromMetrics = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 350 assertTrue(rowsScannedFromMetrics == 1 || rowsScannedFromMetrics == (26 * 26 + 26 + 1)); 351 } 352 } 353 354 // Assert on scan metrics by region 355 assertEquals(expectedScanMetricsByRegion, concurrentScanMetricsByRegion); 356 } finally { 357 TEST_UTIL.deleteTable(tableName); 358 } 359 } 360 361 @TestTemplate 362 public void testRPCCallProcessingAndQueueWaitTimeMetrics() throws Exception { 363 final int numThreads = 20; 364 Configuration conf = TEST_UTIL.getConfiguration(); 365 // Handler count is 3 by default. 366 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 367 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); 368 // Keep the number of threads to be high enough for RPC calls to queue up. For now going with 6 369 // times the handler count. 370 assertTrue(numThreads > 6 * handlerCount); 371 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads); 372 TableName tableName = TableName.valueOf( 373 TestTableScanMetrics.class.getSimpleName() + "_testRPCCallProcessingAndQueueWaitTimeMetrics"); 374 AtomicLong totalScanRpcTime = new AtomicLong(0); 375 AtomicLong totalQueueWaitTime = new AtomicLong(0); 376 CountDownLatch latch = new CountDownLatch(numThreads); 377 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 378 TEST_UTIL.loadTable(table, CF); 379 for (int i = 0; i < numThreads; i++) { 380 executor.execute(new Runnable() { 381 @Override 382 public void run() { 383 try { 384 Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 385 scan.setEnableScanMetricsByRegion(true); 386 scan.setCaching(2); 387 try (ResultScanner rs = table.getScanner(scan)) { 388 Result r; 389 while ((r = rs.next()) != null) { 390 assertFalse(r.isEmpty()); 391 } 392 ScanMetrics scanMetrics = rs.getScanMetrics(); 393 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(); 394 totalScanRpcTime.addAndGet(metricsMap.get(RPC_SCAN_PROCESSING_TIME_METRIC_NAME)); 395 totalQueueWaitTime.addAndGet(metricsMap.get(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME)); 396 } 397 latch.countDown(); 398 } catch (IOException e) { 399 throw new RuntimeException(e); 400 } 401 } 402 }); 403 } 404 latch.await(); 405 executor.shutdown(); 406 executor.awaitTermination(10, TimeUnit.SECONDS); 407 assertTrue(totalScanRpcTime.get() > 0); 408 assertTrue(totalQueueWaitTime.get() > 0); 409 } finally { 410 TEST_UTIL.deleteTable(tableName); 411 } 412 } 413 414 @TestTemplate 415 public void testScanMetricsByRegionWithRegionMove() throws Exception { 416 TableName tableName = TableName.valueOf( 417 TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMove"); 418 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 419 TEST_UTIL.loadTable(table, CF); 420 421 // Scan 2 regions with start row keys: bbb and ccc 422 byte[] bbb = Bytes.toBytes("bbb"); 423 byte[] ccc = Bytes.toBytes("ccc"); 424 byte[] ddc = Bytes.toBytes("ddc"); 425 long expectedCountOfRowsScannedInMovedRegion = 0; 426 // ROWS is the data loaded by loadTable() 427 for (byte[] row : HBaseTestingUtil.ROWS) { 428 if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccc) < 0) { 429 expectedCountOfRowsScannedInMovedRegion++; 430 } 431 } 432 byte[] movedRegion = null; 433 ScanMetrics scanMetrics; 434 435 // Initialize scan with maxResultSize as size of 50 rows. 436 Scan scan = generateScan(bbb, ddc); 437 scan.setEnableScanMetricsByRegion(true); 438 scan.setMaxResultSize(8000); 439 440 try (ResultScanner rs = table.getScanner(scan)) { 441 boolean isFirstScanOfRegion = true; 442 for (Result r : rs) { 443 byte[] row = r.getRow(); 444 if (isFirstScanOfRegion) { 445 movedRegion = moveRegion(tableName, row); 446 isFirstScanOfRegion = false; 447 } 448 } 449 assertNotNull(movedRegion); 450 451 scanMetrics = rs.getScanMetrics(); 452 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 453 scanMetrics.collectMetricsByRegion(); 454 long actualCountOfRowsScannedInMovedRegion = 0; 455 Set<ServerName> serversForMovedRegion = new HashSet<>(); 456 457 // 2 regions scanned with two entries for first region as it moved in b/w scan 458 assertEquals(3, scanMetricsByRegion.size()); 459 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 460 .entrySet()) { 461 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 462 Map<String, Long> metricsMap = entry.getValue(); 463 if (scanMetricsRegionInfo.getEncodedRegionName().equals(Bytes.toString(movedRegion))) { 464 long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 465 actualCountOfRowsScannedInMovedRegion += rowsScanned; 466 serversForMovedRegion.add(scanMetricsRegionInfo.getServerName()); 467 468 assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME)); 469 } 470 assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 471 } 472 assertEquals(expectedCountOfRowsScannedInMovedRegion, 473 actualCountOfRowsScannedInMovedRegion); 474 assertEquals(2, serversForMovedRegion.size()); 475 } 476 } finally { 477 TEST_UTIL.deleteTable(tableName); 478 } 479 } 480 481 @TestTemplate 482 public void testScanMetricsByRegionWithRegionSplit() throws Exception { 483 TableName tableName = TableName.valueOf( 484 TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionSplit"); 485 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 486 TEST_UTIL.loadTable(table, CF); 487 488 // Scan 1 region with start row key: bbb 489 byte[] bbb = Bytes.toBytes("bbb"); 490 byte[] bmw = Bytes.toBytes("bmw"); 491 byte[] ccb = Bytes.toBytes("ccb"); 492 long expectedCountOfRowsScannedInRegion = 0; 493 // ROWS is the data loaded by loadTable() 494 for (byte[] row : HBaseTestingUtil.ROWS) { 495 if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccb) <= 0) { 496 expectedCountOfRowsScannedInRegion++; 497 } 498 } 499 ScanMetrics scanMetrics; 500 Set<String> expectedSplitRegionRes = new HashSet<>(); 501 502 // Initialize scan 503 Scan scan = generateScan(bbb, ccb); 504 scan.setEnableScanMetricsByRegion(true); 505 scan.setMaxResultSize(8000); 506 507 try (ResultScanner rs = table.getScanner(scan)) { 508 boolean isFirstScanOfRegion = true; 509 while (rs.next() != null) { 510 if (isFirstScanOfRegion) { 511 splitRegion(tableName, bbb, bmw) 512 .forEach(region -> expectedSplitRegionRes.add(Bytes.toString(region))); 513 isFirstScanOfRegion = false; 514 } 515 } 516 517 scanMetrics = rs.getScanMetrics(); 518 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 519 scanMetrics.collectMetricsByRegion(); 520 521 long actualCountOfRowsScannedInRegion = 0; 522 long rpcRetiesCount = 0; 523 Set<String> splitRegionRes = new HashSet<>(); 524 525 // 1 entry each for parent and two child regions 526 assertEquals(3, scanMetricsByRegion.size()); 527 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 528 .entrySet()) { 529 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 530 Map<String, Long> metricsMap = entry.getValue(); 531 long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 532 actualCountOfRowsScannedInRegion += rowsScanned; 533 splitRegionRes.add(scanMetricsRegionInfo.getEncodedRegionName()); 534 535 if (metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1) { 536 rpcRetiesCount++; 537 } 538 539 assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 540 } 541 assertEquals(expectedCountOfRowsScannedInRegion, actualCountOfRowsScannedInRegion); 542 assertEquals(2, rpcRetiesCount); 543 assertEquals(expectedSplitRegionRes, splitRegionRes); 544 } 545 } finally { 546 TEST_UTIL.deleteTable(tableName); 547 } 548 } 549 550 @TestTemplate 551 public void testScanMetricsByRegionWithRegionMerge() throws Exception { 552 TableName tableName = TableName.valueOf( 553 TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMerge"); 554 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 555 TEST_UTIL.loadTable(table, CF); 556 557 // Scan 2 regions with start row keys: bbb and ccc 558 byte[] bbb = Bytes.toBytes("bbb"); 559 byte[] ccc = Bytes.toBytes("ccc"); 560 byte[] ddc = Bytes.toBytes("ddc"); 561 long expectedCountOfRowsScannedInRegions = 0; 562 // ROWS is the data loaded by loadTable() 563 for (byte[] row : HBaseTestingUtil.ROWS) { 564 if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ddc) <= 0) { 565 expectedCountOfRowsScannedInRegions++; 566 } 567 } 568 ScanMetrics scanMetrics; 569 Set<String> expectedMergeRegionsRes = new HashSet<>(); 570 String mergedRegionEncodedName = null; 571 572 // Initialize scan 573 Scan scan = generateScan(bbb, ddc); 574 scan.setEnableScanMetricsByRegion(true); 575 scan.setMaxResultSize(8000); 576 577 try (ResultScanner rs = table.getScanner(scan)) { 578 boolean isFirstScanOfRegion = true; 579 while (rs.next() != null) { 580 if (isFirstScanOfRegion) { 581 List<byte[]> out = mergeRegions(tableName, bbb, ccc); 582 // Entry with index 2 is the encoded region name of merged region 583 mergedRegionEncodedName = Bytes.toString(out.get(2)); 584 out.forEach(region -> expectedMergeRegionsRes.add(Bytes.toString(region))); 585 isFirstScanOfRegion = false; 586 } 587 } 588 589 scanMetrics = rs.getScanMetrics(); 590 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 591 scanMetrics.collectMetricsByRegion(); 592 long actualCountOfRowsScannedInRegions = 0; 593 Set<String> mergeRegionsRes = new HashSet<>(); 594 boolean containsMergedRegionInScanMetrics = false; 595 596 // 1 entry each for old region from which first row was scanned and new merged region 597 assertEquals(2, scanMetricsByRegion.size()); 598 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 599 .entrySet()) { 600 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 601 Map<String, Long> metricsMap = entry.getValue(); 602 long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 603 actualCountOfRowsScannedInRegions += rowsScanned; 604 mergeRegionsRes.add(scanMetricsRegionInfo.getEncodedRegionName()); 605 if (scanMetricsRegionInfo.getEncodedRegionName().equals(mergedRegionEncodedName)) { 606 containsMergedRegionInScanMetrics = true; 607 } 608 609 assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME)); 610 assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 611 } 612 assertEquals(expectedCountOfRowsScannedInRegions, actualCountOfRowsScannedInRegions); 613 assertTrue(expectedMergeRegionsRes.containsAll(mergeRegionsRes)); 614 assertTrue(containsMergedRegionInScanMetrics); 615 } 616 } finally { 617 TEST_UTIL.deleteTable(tableName); 618 } 619 } 620 621 private Runnable getPeriodicScanMetricsCollector(ScanMetrics scanMetrics, 622 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegionCollection, 623 CountDownLatch latch) { 624 return new Runnable() { 625 public void run() { 626 try { 627 while (latch.getCount() > 0) { 628 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 629 scanMetrics.collectMetricsByRegion(); 630 mergeScanMetricsByRegion(scanMetricsByRegion, scanMetricsByRegionCollection); 631 Thread.sleep(RAND.nextInt(10)); 632 } 633 } catch (InterruptedException e) { 634 throw new RuntimeException(e); 635 } 636 } 637 }; 638 } 639 640 private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String, Long>> srcMap, 641 Map<ScanMetricsRegionInfo, Map<String, Long>> dstMap) { 642 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : srcMap.entrySet()) { 643 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 644 Map<String, Long> metricsMap = entry.getValue(); 645 // Remove millis between nexts metric as it is not deterministic 646 metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME); 647 metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME); 648 metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); 649 if (dstMap.containsKey(scanMetricsRegionInfo)) { 650 Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo); 651 for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) { 652 String metricName = metricEntry.getKey(); 653 Long existingValue = dstMetricsMap.get(metricName); 654 Long newValue = metricEntry.getValue(); 655 dstMetricsMap.put(metricName, existingValue + newValue); 656 } 657 } else { 658 dstMap.put(scanMetricsRegionInfo, metricsMap); 659 } 660 } 661 } 662 663 /** 664 * Moves the region with start row key from its original region server to some other region 665 * server. This is a synchronous method. 666 * @param tableName Table name of region to be moved belongs. 667 * @param startRow Start row key of the region to be moved. 668 * @return Encoded region name of the region which was moved. 669 */ 670 private byte[] moveRegion(TableName tableName, byte[] startRow) throws IOException { 671 Admin admin = TEST_UTIL.getAdmin(); 672 RegionLocator regionLocator = CONN.getRegionLocator(tableName); 673 HRegionLocation loc = regionLocator.getRegionLocation(startRow, true); 674 byte[] encodedRegionName = loc.getRegion().getEncodedNameAsBytes(); 675 ServerName initialServerName = loc.getServerName(); 676 677 admin.move(encodedRegionName); 678 679 ServerName finalServerName = regionLocator.getRegionLocation(startRow, true).getServerName(); 680 681 // Assert that region actually moved 682 assertNotEquals(initialServerName, finalServerName); 683 return encodedRegionName; 684 } 685 686 /** 687 * Splits the region with start row key at the split key provided. This is a synchronous method. 688 * @param tableName Table name of region to be split. 689 * @param startRow Start row key of the region to be split. 690 * @param splitKey Split key for splitting the region. 691 * @return List of encoded region names with first element being parent region followed by two 692 * child regions. 693 */ 694 private List<byte[]> splitRegion(TableName tableName, byte[] startRow, byte[] splitKey) 695 throws IOException { 696 Admin admin = TEST_UTIL.getAdmin(); 697 RegionLocator regionLocator = CONN.getRegionLocator(tableName); 698 HRegionLocation topLoc = regionLocator.getRegionLocation(startRow, true); 699 byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 700 ServerName initialTopServerName = topLoc.getServerName(); 701 HRegionLocation bottomLoc = regionLocator.getRegionLocation(splitKey, true); 702 byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 703 ServerName initialBottomServerName = bottomLoc.getServerName(); 704 705 // Assert region is ready for split 706 assertEquals(initialTopServerName, initialBottomServerName); 707 assertEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName); 708 709 FutureUtils.get(admin.splitRegionAsync(initialEncodedTopRegionName, splitKey)); 710 711 topLoc = regionLocator.getRegionLocation(startRow, true); 712 byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 713 bottomLoc = regionLocator.getRegionLocation(splitKey, true); 714 byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 715 716 // Assert that region split is complete 717 assertNotEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName); 718 assertNotEquals(initialEncodedTopRegionName, finalEncodedBottomRegionName); 719 assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName); 720 721 return Arrays.asList(initialEncodedTopRegionName, finalEncodedTopRegionName, 722 finalEncodedBottomRegionName); 723 } 724 725 /** 726 * Merges two regions with the start row key as topRegion and bottomRegion. Ensures that the 727 * regions to be merged are adjacent regions. This is a synchronous method. 728 * @param tableName Table name of regions to be merged. 729 * @param topRegion Start row key of first region for merging. 730 * @param bottomRegion Start row key of second region for merging. 731 * @return List of encoded region names with first two elements being original regions followed by 732 * the merged region. 733 */ 734 private List<byte[]> mergeRegions(TableName tableName, byte[] topRegion, byte[] bottomRegion) 735 throws IOException { 736 Admin admin = TEST_UTIL.getAdmin(); 737 RegionLocator regionLocator = CONN.getRegionLocator(tableName); 738 HRegionLocation topLoc = regionLocator.getRegionLocation(topRegion, true); 739 byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 740 String initialTopRegionEndKey = Bytes.toString(topLoc.getRegion().getEndKey()); 741 HRegionLocation bottomLoc = regionLocator.getRegionLocation(bottomRegion, true); 742 byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 743 String initialBottomRegionStartKey = Bytes.toString(bottomLoc.getRegion().getStartKey()); 744 745 // Assert that regions are ready to be merged 746 assertNotEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName); 747 assertEquals(initialBottomRegionStartKey, initialTopRegionEndKey); 748 749 FutureUtils.get(admin.mergeRegionsAsync( 750 new byte[][] { initialEncodedTopRegionName, initialEncodedBottomRegionName }, false)); 751 752 topLoc = regionLocator.getRegionLocation(topRegion, true); 753 byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 754 bottomLoc = regionLocator.getRegionLocation(bottomRegion, true); 755 byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 756 757 // Assert regions have been merges successfully 758 assertEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName); 759 assertNotEquals(initialEncodedTopRegionName, finalEncodedTopRegionName); 760 assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName); 761 762 return Arrays.asList(initialEncodedTopRegionName, initialEncodedBottomRegionName, 763 finalEncodedTopRegionName); 764 } 765}