001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; 021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME; 022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; 023import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME; 024import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; 025import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME; 026import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME; 027import static org.junit.Assert.assertEquals; 028 029import java.io.IOException; 030import java.util.Arrays; 031import java.util.HashMap; 032import java.util.HashSet; 033import java.util.List; 034import java.util.Map; 035import java.util.Random; 036import java.util.Set; 037import java.util.concurrent.CountDownLatch; 038import java.util.concurrent.Executors; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicInteger; 042import java.util.concurrent.atomic.AtomicLong; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseTestingUtil; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 051import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; 052import org.apache.hadoop.hbase.testclassification.ClientTests; 053import org.apache.hadoop.hbase.testclassification.LargeTests; 054import org.apache.hadoop.hbase.util.Bytes; 055import org.apache.hadoop.hbase.util.FutureUtils; 056import org.junit.AfterClass; 057import org.junit.Assert; 058import org.junit.BeforeClass; 059import org.junit.ClassRule; 060import org.junit.Test; 061import org.junit.experimental.categories.Category; 062import org.junit.runners.Parameterized.Parameter; 063import org.junit.runners.Parameterized.Parameters; 064 065@Category({ ClientTests.class, LargeTests.class }) 066public class TestTableScanMetrics extends FromClientSideBase { 067 @ClassRule 068 public static final HBaseClassTestRule CLASS_RULE = 069 HBaseClassTestRule.forClass(TestTableScanMetrics.class); 070 071 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 072 073 private static final TableName TABLE_NAME = 074 TableName.valueOf(TestTableScanMetrics.class.getSimpleName()); 075 076 private static final byte[] CF = Bytes.toBytes("cf"); 077 078 private static final byte[] CQ = Bytes.toBytes("cq"); 079 080 private static final byte[] VALUE = Bytes.toBytes("value"); 081 082 private static final Random RAND = new Random(11); 083 084 private static int NUM_REGIONS; 085 086 private static Connection CONN; 087 088 @Parameters(name = "{index}: scanner={0}") 089 public static List<Object[]> params() { 090 return Arrays.asList(new Object[] { "ForwardScanner", new Scan() }, 091 new Object[] { "ReverseScanner", new Scan().setReversed(true) }); 092 } 093 094 @Parameter(0) 095 public String scannerName; 096 097 @Parameter(1) 098 public Scan originalScan; 099 100 @BeforeClass 101 public static void setUp() throws Exception { 102 // Start the minicluster 103 TEST_UTIL.startMiniCluster(2); 104 // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that 105 // scan hits all the region and not all rows lie in a single region 106 try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) { 107 table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE), 108 new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE), 109 new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE))); 110 } 111 CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 112 NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size(); 113 } 114 115 @AfterClass 116 public static void tearDown() throws Exception { 117 TEST_UTIL.shutdownMiniCluster(); 118 } 119 120 private Scan generateScan(byte[] smallerRow, byte[] largerRow) throws IOException { 121 Scan scan = new Scan(originalScan); 122 if (originalScan.isReversed()) { 123 scan.withStartRow(largerRow, true); 124 scan.withStopRow(smallerRow, true); 125 } else { 126 scan.withStartRow(smallerRow, true); 127 scan.withStopRow(largerRow, true); 128 } 129 return scan; 130 } 131 132 private ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int expectedCount) 133 throws IOException { 134 int countOfRows = 0; 135 ScanMetrics scanMetrics; 136 try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = table.getScanner(scan)) { 137 for (Result result : scanner) { 138 Assert.assertFalse(result.isEmpty()); 139 countOfRows++; 140 } 141 scanMetrics = scanner.getScanMetrics(); 142 } 143 Assert.assertEquals(expectedCount, countOfRows); 144 return scanMetrics; 145 } 146 147 @Test 148 public void testScanMetricsDisabled() throws Exception { 149 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 150 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3); 151 Assert.assertNull(scanMetrics); 152 } 153 154 @Test 155 public void testScanMetricsWithScanMetricByRegionDisabled() throws Exception { 156 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 157 scan.setScanMetricsEnabled(true); 158 int expectedRowsScanned = 3; 159 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 160 Assert.assertNotNull(scanMetrics); 161 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false); 162 // The test setup is such that we have 1 row per region in the scan range 163 Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRegions.get()); 164 Assert.assertEquals(expectedRowsScanned, 165 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 166 Assert.assertTrue(scanMetrics.collectMetricsByRegion().isEmpty()); 167 } 168 169 @Test 170 public void testScanMetricsResetWithScanMetricsByRegionDisabled() throws Exception { 171 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 172 scan.setScanMetricsEnabled(true); 173 int expectedRowsScanned = 3; 174 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 175 Assert.assertNotNull(scanMetrics); 176 // By default counters are collected with reset as true 177 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(); 178 Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 179 Assert.assertEquals(expectedRowsScanned, 180 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 181 // Subsequent call to get scan metrics map should show all counters as 0 182 Assert.assertEquals(0, scanMetrics.countOfRegions.get()); 183 Assert.assertEquals(0, scanMetrics.countOfRowsScanned.get()); 184 } 185 186 @Test 187 public void testScanMetricsByRegionForSingleRegionScan() throws Exception { 188 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("xxx1")); 189 scan.setEnableScanMetricsByRegion(true); 190 int expectedRowsScanned = 1; 191 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 192 Assert.assertNotNull(scanMetrics); 193 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false); 194 Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 195 Assert.assertEquals(expectedRowsScanned, 196 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 197 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 198 scanMetrics.collectMetricsByRegion(false); 199 Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size()); 200 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 201 .entrySet()) { 202 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 203 metricsMap = entry.getValue(); 204 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 205 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 206 // As we are scanning single row so, overall scan metrics will match per region scan metrics 207 Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 208 Assert.assertEquals(expectedRowsScanned, 209 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 210 } 211 } 212 213 @Test 214 public void testScanMetricsByRegionForMultiRegionScan() throws Exception { 215 Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 216 scan.setEnableScanMetricsByRegion(true); 217 int expectedRowsScanned = 3; 218 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 219 Assert.assertNotNull(scanMetrics); 220 Assert.assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get()); 221 Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRowsScanned.get()); 222 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 223 scanMetrics.collectMetricsByRegion(false); 224 Assert.assertEquals(NUM_REGIONS, scanMetricsByRegion.size()); 225 int rowsScannedAcrossAllRegions = 0; 226 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 227 .entrySet()) { 228 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 229 Map<String, Long> metricsMap = entry.getValue(); 230 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 231 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 232 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 233 if (metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) { 234 rowsScannedAcrossAllRegions++; 235 } else { 236 assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 237 } 238 } 239 Assert.assertEquals(expectedRowsScanned, rowsScannedAcrossAllRegions); 240 } 241 242 @Test 243 public void testScanMetricsByRegionReset() throws Exception { 244 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 245 scan.setEnableScanMetricsByRegion(true); 246 int expectedRowsScanned = 3; 247 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 248 Assert.assertNotNull(scanMetrics); 249 250 // Retrieve scan metrics by region as a map and reset 251 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 252 scanMetrics.collectMetricsByRegion(); 253 // We scan 1 row per region 254 Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size()); 255 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 256 .entrySet()) { 257 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 258 Map<String, Long> metricsMap = entry.getValue(); 259 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 260 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 261 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 262 Assert.assertEquals(1, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 263 } 264 265 // Scan metrics have already been reset and now all counters should be 0 266 scanMetricsByRegion = scanMetrics.collectMetricsByRegion(false); 267 // Size of map should be same as earlier 268 Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size()); 269 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 270 .entrySet()) { 271 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 272 Map<String, Long> metricsMap = entry.getValue(); 273 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 274 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 275 // Counters should have been reset to 0 276 Assert.assertEquals(0, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 277 Assert.assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 278 } 279 } 280 281 @Test 282 public void testConcurrentUpdatesAndResetOfScanMetricsByRegion() throws Exception { 283 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); 284 TableName tableName = TableName.valueOf(TestTableScanMetrics.class.getSimpleName() 285 + "_testConcurrentUpdatesAndResetToScanMetricsByRegion"); 286 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 287 TEST_UTIL.loadTable(table, CF); 288 289 Map<ScanMetricsRegionInfo, Map<String, Long>> concurrentScanMetricsByRegion = new HashMap<>(); 290 291 // Trigger two concurrent threads one of which scans the table and other periodically 292 // collects the scan metrics (along with resetting the counters to 0). 293 Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 294 scan.setEnableScanMetricsByRegion(true); 295 scan.setCaching(2); 296 try (ResultScanner rs = table.getScanner(scan)) { 297 ScanMetrics scanMetrics = rs.getScanMetrics(); 298 AtomicInteger rowsScanned = new AtomicInteger(0); 299 CountDownLatch latch = new CountDownLatch(1); 300 Runnable tableScanner = new Runnable() { 301 public void run() { 302 for (Result r : rs) { 303 Assert.assertFalse(r.isEmpty()); 304 rowsScanned.incrementAndGet(); 305 } 306 latch.countDown(); 307 } 308 }; 309 Runnable metricsCollector = 310 getPeriodicScanMetricsCollector(scanMetrics, concurrentScanMetricsByRegion, latch); 311 executor.execute(tableScanner); 312 executor.execute(metricsCollector); 313 latch.await(); 314 // Merge leftover scan metrics 315 mergeScanMetricsByRegion(scanMetrics.collectMetricsByRegion(), 316 concurrentScanMetricsByRegion); 317 Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned.get()); 318 } 319 320 Map<ScanMetricsRegionInfo, Map<String, Long>> expectedScanMetricsByRegion; 321 322 // Collect scan metrics by region from single thread. Assert that concurrent scan 323 // and metrics collection works as expected. 324 scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 325 scan.setEnableScanMetricsByRegion(true); 326 scan.setCaching(2); 327 try (ResultScanner rs = table.getScanner(scan)) { 328 ScanMetrics scanMetrics = rs.getScanMetrics(); 329 int rowsScanned = 0; 330 for (Result r : rs) { 331 Assert.assertFalse(r.isEmpty()); 332 rowsScanned++; 333 } 334 Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned); 335 expectedScanMetricsByRegion = scanMetrics.collectMetricsByRegion(); 336 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : expectedScanMetricsByRegion 337 .entrySet()) { 338 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 339 Map<String, Long> metricsMap = entry.getValue(); 340 // Remove millis between nexts metric as it is not deterministic 341 metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME); 342 metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME); 343 metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); 344 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 345 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 346 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 347 // Each region will have 26 * 26 + 26 + 1 rows except last region which will have 1 row 348 long rowsScannedFromMetrics = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 349 Assert.assertTrue( 350 rowsScannedFromMetrics == 1 || rowsScannedFromMetrics == (26 * 26 + 26 + 1)); 351 } 352 } 353 354 // Assert on scan metrics by region 355 Assert.assertEquals(expectedScanMetricsByRegion, concurrentScanMetricsByRegion); 356 } finally { 357 TEST_UTIL.deleteTable(tableName); 358 } 359 } 360 361 @Test 362 public void testRPCCallProcessingAndQueueWaitTimeMetrics() throws Exception { 363 final int numThreads = 20; 364 Configuration conf = TEST_UTIL.getConfiguration(); 365 // Handler count is 3 by default. 366 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 367 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); 368 // Keep the number of threads to be high enough for RPC calls to queue up. For now going with 6 369 // times the handler count. 370 Assert.assertTrue(numThreads > 6 * handlerCount); 371 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads); 372 TableName tableName = TableName.valueOf( 373 TestTableScanMetrics.class.getSimpleName() + "_testRPCCallProcessingAndQueueWaitTimeMetrics"); 374 AtomicLong totalScanRpcTime = new AtomicLong(0); 375 AtomicLong totalQueueWaitTime = new AtomicLong(0); 376 CountDownLatch latch = new CountDownLatch(numThreads); 377 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 378 TEST_UTIL.loadTable(table, CF); 379 for (int i = 0; i < numThreads; i++) { 380 executor.execute(new Runnable() { 381 @Override 382 public void run() { 383 try { 384 Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 385 scan.setEnableScanMetricsByRegion(true); 386 scan.setCaching(2); 387 try (ResultScanner rs = table.getScanner(scan)) { 388 Result r; 389 while ((r = rs.next()) != null) { 390 Assert.assertFalse(r.isEmpty()); 391 } 392 ScanMetrics scanMetrics = rs.getScanMetrics(); 393 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(); 394 totalScanRpcTime.addAndGet(metricsMap.get(RPC_SCAN_PROCESSING_TIME_METRIC_NAME)); 395 totalQueueWaitTime.addAndGet(metricsMap.get(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME)); 396 } 397 latch.countDown(); 398 } catch (IOException e) { 399 throw new RuntimeException(e); 400 } 401 } 402 }); 403 } 404 latch.await(); 405 executor.shutdown(); 406 executor.awaitTermination(10, TimeUnit.SECONDS); 407 Assert.assertTrue(totalScanRpcTime.get() > 0); 408 Assert.assertTrue(totalQueueWaitTime.get() > 0); 409 } finally { 410 TEST_UTIL.deleteTable(tableName); 411 } 412 } 413 414 @Test 415 public void testScanMetricsByRegionWithRegionMove() throws Exception { 416 TableName tableName = TableName.valueOf( 417 TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMove"); 418 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 419 TEST_UTIL.loadTable(table, CF); 420 421 // Scan 2 regions with start row keys: bbb and ccc 422 byte[] bbb = Bytes.toBytes("bbb"); 423 byte[] ccc = Bytes.toBytes("ccc"); 424 byte[] ddc = Bytes.toBytes("ddc"); 425 long expectedCountOfRowsScannedInMovedRegion = 0; 426 // ROWS is the data loaded by loadTable() 427 for (byte[] row : HBaseTestingUtil.ROWS) { 428 if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccc) < 0) { 429 expectedCountOfRowsScannedInMovedRegion++; 430 } 431 } 432 byte[] movedRegion = null; 433 ScanMetrics scanMetrics; 434 435 // Initialize scan with maxResultSize as size of 50 rows. 436 Scan scan = generateScan(bbb, ddc); 437 scan.setEnableScanMetricsByRegion(true); 438 scan.setMaxResultSize(8000); 439 440 try (ResultScanner rs = table.getScanner(scan)) { 441 boolean isFirstScanOfRegion = true; 442 for (Result r : rs) { 443 byte[] row = r.getRow(); 444 if (isFirstScanOfRegion) { 445 movedRegion = moveRegion(tableName, row); 446 isFirstScanOfRegion = false; 447 } 448 } 449 Assert.assertNotNull(movedRegion); 450 451 scanMetrics = rs.getScanMetrics(); 452 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 453 scanMetrics.collectMetricsByRegion(); 454 long actualCountOfRowsScannedInMovedRegion = 0; 455 Set<ServerName> serversForMovedRegion = new HashSet<>(); 456 457 // 2 regions scanned with two entries for first region as it moved in b/w scan 458 Assert.assertEquals(3, scanMetricsByRegion.size()); 459 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 460 .entrySet()) { 461 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 462 Map<String, Long> metricsMap = entry.getValue(); 463 if (scanMetricsRegionInfo.getEncodedRegionName().equals(Bytes.toString(movedRegion))) { 464 long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 465 actualCountOfRowsScannedInMovedRegion += rowsScanned; 466 serversForMovedRegion.add(scanMetricsRegionInfo.getServerName()); 467 468 Assert.assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME)); 469 } 470 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 471 } 472 Assert.assertEquals(expectedCountOfRowsScannedInMovedRegion, 473 actualCountOfRowsScannedInMovedRegion); 474 Assert.assertEquals(2, serversForMovedRegion.size()); 475 } 476 } finally { 477 TEST_UTIL.deleteTable(tableName); 478 } 479 } 480 481 @Test 482 public void testScanMetricsByRegionWithRegionSplit() throws Exception { 483 TableName tableName = TableName.valueOf( 484 TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionSplit"); 485 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 486 TEST_UTIL.loadTable(table, CF); 487 488 // Scan 1 region with start row key: bbb 489 byte[] bbb = Bytes.toBytes("bbb"); 490 byte[] bmw = Bytes.toBytes("bmw"); 491 byte[] ccb = Bytes.toBytes("ccb"); 492 long expectedCountOfRowsScannedInRegion = 0; 493 // ROWS is the data loaded by loadTable() 494 for (byte[] row : HBaseTestingUtil.ROWS) { 495 if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccb) <= 0) { 496 expectedCountOfRowsScannedInRegion++; 497 } 498 } 499 ScanMetrics scanMetrics; 500 Set<String> expectedSplitRegionRes = new HashSet<>(); 501 502 // Initialize scan 503 Scan scan = generateScan(bbb, ccb); 504 scan.setEnableScanMetricsByRegion(true); 505 scan.setMaxResultSize(8000); 506 507 try (ResultScanner rs = table.getScanner(scan)) { 508 boolean isFirstScanOfRegion = true; 509 for (Result r : rs) { 510 if (isFirstScanOfRegion) { 511 splitRegion(tableName, bbb, bmw) 512 .forEach(region -> expectedSplitRegionRes.add(Bytes.toString(region))); 513 isFirstScanOfRegion = false; 514 } 515 } 516 517 scanMetrics = rs.getScanMetrics(); 518 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 519 scanMetrics.collectMetricsByRegion(); 520 521 long actualCountOfRowsScannedInRegion = 0; 522 long rpcRetiesCount = 0; 523 Set<String> splitRegionRes = new HashSet<>(); 524 525 // 1 entry each for parent and two child regions 526 Assert.assertEquals(3, scanMetricsByRegion.size()); 527 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 528 .entrySet()) { 529 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 530 Map<String, Long> metricsMap = entry.getValue(); 531 long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 532 actualCountOfRowsScannedInRegion += rowsScanned; 533 splitRegionRes.add(scanMetricsRegionInfo.getEncodedRegionName()); 534 535 if (metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1) { 536 rpcRetiesCount++; 537 } 538 539 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 540 } 541 Assert.assertEquals(expectedCountOfRowsScannedInRegion, actualCountOfRowsScannedInRegion); 542 Assert.assertEquals(2, rpcRetiesCount); 543 Assert.assertEquals(expectedSplitRegionRes, splitRegionRes); 544 } 545 } finally { 546 TEST_UTIL.deleteTable(tableName); 547 } 548 } 549 550 @Test 551 public void testScanMetricsByRegionWithRegionMerge() throws Exception { 552 TableName tableName = TableName.valueOf( 553 TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMerge"); 554 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 555 TEST_UTIL.loadTable(table, CF); 556 557 // Scan 2 regions with start row keys: bbb and ccc 558 byte[] bbb = Bytes.toBytes("bbb"); 559 byte[] ccc = Bytes.toBytes("ccc"); 560 byte[] ddc = Bytes.toBytes("ddc"); 561 long expectedCountOfRowsScannedInRegions = 0; 562 // ROWS is the data loaded by loadTable() 563 for (byte[] row : HBaseTestingUtil.ROWS) { 564 if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ddc) <= 0) { 565 expectedCountOfRowsScannedInRegions++; 566 } 567 } 568 ScanMetrics scanMetrics; 569 Set<String> expectedMergeRegionsRes = new HashSet<>(); 570 String mergedRegionEncodedName = null; 571 572 // Initialize scan 573 Scan scan = generateScan(bbb, ddc); 574 scan.setEnableScanMetricsByRegion(true); 575 scan.setMaxResultSize(8000); 576 577 try (ResultScanner rs = table.getScanner(scan)) { 578 boolean isFirstScanOfRegion = true; 579 for (Result r : rs) { 580 if (isFirstScanOfRegion) { 581 List<byte[]> out = mergeRegions(tableName, bbb, ccc); 582 // Entry with index 2 is the encoded region name of merged region 583 mergedRegionEncodedName = Bytes.toString(out.get(2)); 584 out.forEach(region -> expectedMergeRegionsRes.add(Bytes.toString(region))); 585 isFirstScanOfRegion = false; 586 } 587 } 588 589 scanMetrics = rs.getScanMetrics(); 590 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 591 scanMetrics.collectMetricsByRegion(); 592 long actualCountOfRowsScannedInRegions = 0; 593 Set<String> mergeRegionsRes = new HashSet<>(); 594 boolean containsMergedRegionInScanMetrics = false; 595 596 // 1 entry each for old region from which first row was scanned and new merged region 597 Assert.assertEquals(2, scanMetricsByRegion.size()); 598 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 599 .entrySet()) { 600 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 601 Map<String, Long> metricsMap = entry.getValue(); 602 long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 603 actualCountOfRowsScannedInRegions += rowsScanned; 604 mergeRegionsRes.add(scanMetricsRegionInfo.getEncodedRegionName()); 605 if (scanMetricsRegionInfo.getEncodedRegionName().equals(mergedRegionEncodedName)) { 606 containsMergedRegionInScanMetrics = true; 607 } 608 609 Assert.assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME)); 610 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 611 } 612 Assert.assertEquals(expectedCountOfRowsScannedInRegions, actualCountOfRowsScannedInRegions); 613 Assert.assertTrue(expectedMergeRegionsRes.containsAll(mergeRegionsRes)); 614 Assert.assertTrue(containsMergedRegionInScanMetrics); 615 } 616 } finally { 617 TEST_UTIL.deleteTable(tableName); 618 } 619 } 620 621 private Runnable getPeriodicScanMetricsCollector(ScanMetrics scanMetrics, 622 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegionCollection, 623 CountDownLatch latch) { 624 return new Runnable() { 625 public void run() { 626 try { 627 while (latch.getCount() > 0) { 628 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 629 scanMetrics.collectMetricsByRegion(); 630 mergeScanMetricsByRegion(scanMetricsByRegion, scanMetricsByRegionCollection); 631 Thread.sleep(RAND.nextInt(10)); 632 } 633 } catch (InterruptedException e) { 634 throw new RuntimeException(e); 635 } 636 } 637 }; 638 } 639 640 private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String, Long>> srcMap, 641 Map<ScanMetricsRegionInfo, Map<String, Long>> dstMap) { 642 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : srcMap.entrySet()) { 643 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 644 Map<String, Long> metricsMap = entry.getValue(); 645 // Remove millis between nexts metric as it is not deterministic 646 metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME); 647 metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME); 648 metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); 649 if (dstMap.containsKey(scanMetricsRegionInfo)) { 650 Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo); 651 for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) { 652 String metricName = metricEntry.getKey(); 653 Long existingValue = dstMetricsMap.get(metricName); 654 Long newValue = metricEntry.getValue(); 655 dstMetricsMap.put(metricName, existingValue + newValue); 656 } 657 } else { 658 dstMap.put(scanMetricsRegionInfo, metricsMap); 659 } 660 } 661 } 662 663 /** 664 * Moves the region with start row key from its original region server to some other region 665 * server. This is a synchronous method. 666 * @param tableName Table name of region to be moved belongs. 667 * @param startRow Start row key of the region to be moved. 668 * @return Encoded region name of the region which was moved. 669 */ 670 private byte[] moveRegion(TableName tableName, byte[] startRow) throws IOException { 671 Admin admin = TEST_UTIL.getAdmin(); 672 RegionLocator regionLocator = CONN.getRegionLocator(tableName); 673 HRegionLocation loc = regionLocator.getRegionLocation(startRow, true); 674 byte[] encodedRegionName = loc.getRegion().getEncodedNameAsBytes(); 675 ServerName initialServerName = loc.getServerName(); 676 677 admin.move(encodedRegionName); 678 679 ServerName finalServerName = regionLocator.getRegionLocation(startRow, true).getServerName(); 680 681 // Assert that region actually moved 682 Assert.assertNotEquals(initialServerName, finalServerName); 683 return encodedRegionName; 684 } 685 686 /** 687 * Splits the region with start row key at the split key provided. This is a synchronous method. 688 * @param tableName Table name of region to be split. 689 * @param startRow Start row key of the region to be split. 690 * @param splitKey Split key for splitting the region. 691 * @return List of encoded region names with first element being parent region followed by two 692 * child regions. 693 */ 694 private List<byte[]> splitRegion(TableName tableName, byte[] startRow, byte[] splitKey) 695 throws IOException { 696 Admin admin = TEST_UTIL.getAdmin(); 697 RegionLocator regionLocator = CONN.getRegionLocator(tableName); 698 HRegionLocation topLoc = regionLocator.getRegionLocation(startRow, true); 699 byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 700 ServerName initialTopServerName = topLoc.getServerName(); 701 HRegionLocation bottomLoc = regionLocator.getRegionLocation(splitKey, true); 702 byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 703 ServerName initialBottomServerName = bottomLoc.getServerName(); 704 705 // Assert region is ready for split 706 Assert.assertEquals(initialTopServerName, initialBottomServerName); 707 Assert.assertEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName); 708 709 FutureUtils.get(admin.splitRegionAsync(initialEncodedTopRegionName, splitKey)); 710 711 topLoc = regionLocator.getRegionLocation(startRow, true); 712 byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 713 bottomLoc = regionLocator.getRegionLocation(splitKey, true); 714 byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 715 716 // Assert that region split is complete 717 Assert.assertNotEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName); 718 Assert.assertNotEquals(initialEncodedTopRegionName, finalEncodedBottomRegionName); 719 Assert.assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName); 720 721 return Arrays.asList(initialEncodedTopRegionName, finalEncodedTopRegionName, 722 finalEncodedBottomRegionName); 723 } 724 725 /** 726 * Merges two regions with the start row key as topRegion and bottomRegion. Ensures that the 727 * regions to be merged are adjacent regions. This is a synchronous method. 728 * @param tableName Table name of regions to be merged. 729 * @param topRegion Start row key of first region for merging. 730 * @param bottomRegion Start row key of second region for merging. 731 * @return List of encoded region names with first two elements being original regions followed by 732 * the merged region. 733 */ 734 private List<byte[]> mergeRegions(TableName tableName, byte[] topRegion, byte[] bottomRegion) 735 throws IOException { 736 Admin admin = TEST_UTIL.getAdmin(); 737 RegionLocator regionLocator = CONN.getRegionLocator(tableName); 738 HRegionLocation topLoc = regionLocator.getRegionLocation(topRegion, true); 739 byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 740 String initialTopRegionEndKey = Bytes.toString(topLoc.getRegion().getEndKey()); 741 HRegionLocation bottomLoc = regionLocator.getRegionLocation(bottomRegion, true); 742 byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 743 String initialBottomRegionStartKey = Bytes.toString(bottomLoc.getRegion().getStartKey()); 744 745 // Assert that regions are ready to be merged 746 Assert.assertNotEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName); 747 Assert.assertEquals(initialBottomRegionStartKey, initialTopRegionEndKey); 748 749 FutureUtils.get(admin.mergeRegionsAsync( 750 new byte[][] { initialEncodedTopRegionName, initialEncodedBottomRegionName }, false)); 751 752 topLoc = regionLocator.getRegionLocation(topRegion, true); 753 byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 754 bottomLoc = regionLocator.getRegionLocation(bottomRegion, true); 755 byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 756 757 // Assert regions have been merges successfully 758 Assert.assertEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName); 759 Assert.assertNotEquals(initialEncodedTopRegionName, finalEncodedTopRegionName); 760 Assert.assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName); 761 762 return Arrays.asList(initialEncodedTopRegionName, initialEncodedBottomRegionName, 763 finalEncodedTopRegionName); 764 } 765}