001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY; 021import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME; 022import static org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME; 023import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; 024import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME; 025import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME; 026import static org.junit.Assert.assertEquals; 027 028import java.io.IOException; 029import java.util.Arrays; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.List; 033import java.util.Map; 034import java.util.Random; 035import java.util.Set; 036import java.util.concurrent.CountDownLatch; 037import java.util.concurrent.Executors; 038import java.util.concurrent.ThreadPoolExecutor; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicInteger; 041import java.util.concurrent.atomic.AtomicLong; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.hbase.HBaseClassTestRule; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.HRegionLocation; 047import org.apache.hadoop.hbase.ServerName; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 050import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; 051import org.apache.hadoop.hbase.testclassification.ClientTests; 052import org.apache.hadoop.hbase.testclassification.LargeTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.FutureUtils; 055import org.junit.AfterClass; 056import org.junit.Assert; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.runners.Parameterized.Parameter; 062import org.junit.runners.Parameterized.Parameters; 063 064@Category({ ClientTests.class, LargeTests.class }) 065public class TestTableScanMetrics extends FromClientSideBase { 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestTableScanMetrics.class); 069 070 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 071 072 private static final TableName TABLE_NAME = 073 TableName.valueOf(TestTableScanMetrics.class.getSimpleName()); 074 075 private static final byte[] CF = Bytes.toBytes("cf"); 076 077 private static final byte[] CQ = Bytes.toBytes("cq"); 078 079 private static final byte[] VALUE = Bytes.toBytes("value"); 080 081 private static final Random RAND = new Random(11); 082 083 private static int NUM_REGIONS; 084 085 private static Connection CONN; 086 087 @Parameters(name = "{index}: scanner={0}") 088 public static List<Object[]> params() { 089 return Arrays.asList(new Object[] { "ForwardScanner", new Scan() }, 090 new Object[] { "ReverseScanner", new Scan().setReversed(true) }); 091 } 092 093 @Parameter(0) 094 public String scannerName; 095 096 @Parameter(1) 097 public Scan originalScan; 098 099 @BeforeClass 100 public static void setUp() throws Exception { 101 // Start the minicluster 102 TEST_UTIL.startMiniCluster(2); 103 // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" and "zzz*" so that 104 // scan hits all the region and not all rows lie in a single region 105 try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) { 106 table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, VALUE), 107 new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE), 108 new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE))); 109 } 110 CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); 111 NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size(); 112 } 113 114 @AfterClass 115 public static void tearDown() throws Exception { 116 TEST_UTIL.shutdownMiniCluster(); 117 } 118 119 private Scan generateScan(byte[] smallerRow, byte[] largerRow) throws IOException { 120 Scan scan = new Scan(originalScan); 121 if (originalScan.isReversed()) { 122 scan.withStartRow(largerRow, true); 123 scan.withStopRow(smallerRow, true); 124 } else { 125 scan.withStartRow(smallerRow, true); 126 scan.withStopRow(largerRow, true); 127 } 128 return scan; 129 } 130 131 private ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int expectedCount) 132 throws IOException { 133 int countOfRows = 0; 134 ScanMetrics scanMetrics; 135 try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = table.getScanner(scan)) { 136 for (Result result : scanner) { 137 Assert.assertFalse(result.isEmpty()); 138 countOfRows++; 139 } 140 scanMetrics = scanner.getScanMetrics(); 141 } 142 Assert.assertEquals(expectedCount, countOfRows); 143 return scanMetrics; 144 } 145 146 @Test 147 public void testScanMetricsDisabled() throws Exception { 148 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 149 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3); 150 Assert.assertNull(scanMetrics); 151 } 152 153 @Test 154 public void testScanMetricsWithScanMetricByRegionDisabled() throws Exception { 155 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 156 scan.setScanMetricsEnabled(true); 157 int expectedRowsScanned = 3; 158 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 159 Assert.assertNotNull(scanMetrics); 160 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false); 161 // The test setup is such that we have 1 row per region in the scan range 162 Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRegions.get()); 163 Assert.assertEquals(expectedRowsScanned, 164 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 165 Assert.assertTrue(scanMetrics.collectMetricsByRegion().isEmpty()); 166 } 167 168 @Test 169 public void testScanMetricsResetWithScanMetricsByRegionDisabled() throws Exception { 170 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 171 scan.setScanMetricsEnabled(true); 172 int expectedRowsScanned = 3; 173 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 174 Assert.assertNotNull(scanMetrics); 175 // By default counters are collected with reset as true 176 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(); 177 Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 178 Assert.assertEquals(expectedRowsScanned, 179 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 180 // Subsequent call to get scan metrics map should show all counters as 0 181 Assert.assertEquals(0, scanMetrics.countOfRegions.get()); 182 Assert.assertEquals(0, scanMetrics.countOfRowsScanned.get()); 183 } 184 185 @Test 186 public void testScanMetricsByRegionForSingleRegionScan() throws Exception { 187 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("xxx1")); 188 scan.setEnableScanMetricsByRegion(true); 189 int expectedRowsScanned = 1; 190 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 191 Assert.assertNotNull(scanMetrics); 192 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false); 193 Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 194 Assert.assertEquals(expectedRowsScanned, 195 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 196 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 197 scanMetrics.collectMetricsByRegion(false); 198 Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size()); 199 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 200 .entrySet()) { 201 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 202 metricsMap = entry.getValue(); 203 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 204 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 205 // As we are scanning single row so, overall scan metrics will match per region scan metrics 206 Assert.assertEquals(expectedRowsScanned, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 207 Assert.assertEquals(expectedRowsScanned, 208 (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 209 } 210 } 211 212 @Test 213 public void testScanMetricsByRegionForMultiRegionScan() throws Exception { 214 Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 215 scan.setEnableScanMetricsByRegion(true); 216 int expectedRowsScanned = 3; 217 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 218 Assert.assertNotNull(scanMetrics); 219 Assert.assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get()); 220 Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRowsScanned.get()); 221 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 222 scanMetrics.collectMetricsByRegion(false); 223 Assert.assertEquals(NUM_REGIONS, scanMetricsByRegion.size()); 224 int rowsScannedAcrossAllRegions = 0; 225 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 226 .entrySet()) { 227 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 228 Map<String, Long> metricsMap = entry.getValue(); 229 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 230 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 231 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 232 if (metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) { 233 rowsScannedAcrossAllRegions++; 234 } else { 235 assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 236 } 237 } 238 Assert.assertEquals(expectedRowsScanned, rowsScannedAcrossAllRegions); 239 } 240 241 @Test 242 public void testScanMetricsByRegionReset() throws Exception { 243 Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1")); 244 scan.setEnableScanMetricsByRegion(true); 245 int expectedRowsScanned = 3; 246 ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, expectedRowsScanned); 247 Assert.assertNotNull(scanMetrics); 248 249 // Retrieve scan metrics by region as a map and reset 250 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 251 scanMetrics.collectMetricsByRegion(); 252 // We scan 1 row per region 253 Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size()); 254 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 255 .entrySet()) { 256 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 257 Map<String, Long> metricsMap = entry.getValue(); 258 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 259 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 260 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 261 Assert.assertEquals(1, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 262 } 263 264 // Scan metrics have already been reset and now all counters should be 0 265 scanMetricsByRegion = scanMetrics.collectMetricsByRegion(false); 266 // Size of map should be same as earlier 267 Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size()); 268 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 269 .entrySet()) { 270 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 271 Map<String, Long> metricsMap = entry.getValue(); 272 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 273 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 274 // Counters should have been reset to 0 275 Assert.assertEquals(0, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 276 Assert.assertEquals(0, (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); 277 } 278 } 279 280 @Test 281 public void testConcurrentUpdatesAndResetOfScanMetricsByRegion() throws Exception { 282 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); 283 TableName tableName = TableName.valueOf(TestTableScanMetrics.class.getSimpleName() 284 + "_testConcurrentUpdatesAndResetToScanMetricsByRegion"); 285 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 286 TEST_UTIL.loadTable(table, CF); 287 288 Map<ScanMetricsRegionInfo, Map<String, Long>> concurrentScanMetricsByRegion = new HashMap<>(); 289 290 // Trigger two concurrent threads one of which scans the table and other periodically 291 // collects the scan metrics (along with resetting the counters to 0). 292 Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 293 scan.setEnableScanMetricsByRegion(true); 294 scan.setCaching(2); 295 try (ResultScanner rs = table.getScanner(scan)) { 296 ScanMetrics scanMetrics = rs.getScanMetrics(); 297 AtomicInteger rowsScanned = new AtomicInteger(0); 298 CountDownLatch latch = new CountDownLatch(1); 299 Runnable tableScanner = new Runnable() { 300 public void run() { 301 for (Result r : rs) { 302 Assert.assertFalse(r.isEmpty()); 303 rowsScanned.incrementAndGet(); 304 } 305 latch.countDown(); 306 } 307 }; 308 Runnable metricsCollector = 309 getPeriodicScanMetricsCollector(scanMetrics, concurrentScanMetricsByRegion, latch); 310 executor.execute(tableScanner); 311 executor.execute(metricsCollector); 312 latch.await(); 313 // Merge leftover scan metrics 314 mergeScanMetricsByRegion(scanMetrics.collectMetricsByRegion(), 315 concurrentScanMetricsByRegion); 316 Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned.get()); 317 } 318 319 Map<ScanMetricsRegionInfo, Map<String, Long>> expectedScanMetricsByRegion; 320 321 // Collect scan metrics by region from single thread. Assert that concurrent scan 322 // and metrics collection works as expected. 323 scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 324 scan.setEnableScanMetricsByRegion(true); 325 scan.setCaching(2); 326 try (ResultScanner rs = table.getScanner(scan)) { 327 ScanMetrics scanMetrics = rs.getScanMetrics(); 328 int rowsScanned = 0; 329 for (Result r : rs) { 330 Assert.assertFalse(r.isEmpty()); 331 rowsScanned++; 332 } 333 Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned); 334 expectedScanMetricsByRegion = scanMetrics.collectMetricsByRegion(); 335 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : expectedScanMetricsByRegion 336 .entrySet()) { 337 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 338 Map<String, Long> metricsMap = entry.getValue(); 339 metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME); 340 metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); 341 Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName()); 342 Assert.assertNotNull(scanMetricsRegionInfo.getServerName()); 343 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 344 // Each region will have 26 * 26 + 26 + 1 rows except last region which will have 1 row 345 long rowsScannedFromMetrics = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 346 Assert.assertTrue( 347 rowsScannedFromMetrics == 1 || rowsScannedFromMetrics == (26 * 26 + 26 + 1)); 348 } 349 } 350 351 // Assert on scan metrics by region 352 Assert.assertEquals(expectedScanMetricsByRegion, concurrentScanMetricsByRegion); 353 } finally { 354 TEST_UTIL.deleteTable(tableName); 355 } 356 } 357 358 @Test 359 public void testRPCCallProcessingAndQueueWaitTimeMetrics() throws Exception { 360 final int numThreads = 20; 361 Configuration conf = TEST_UTIL.getConfiguration(); 362 // Handler count is 3 by default. 363 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 364 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); 365 // Keep the number of threads to be high enough for RPC calls to queue up. For now going with 6 366 // times the handler count. 367 Assert.assertTrue(numThreads > 6 * handlerCount); 368 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numThreads); 369 TableName tableName = TableName.valueOf( 370 TestTableScanMetrics.class.getSimpleName() + "_testRPCCallProcessingAndQueueWaitTimeMetrics"); 371 AtomicLong totalScanRpcTime = new AtomicLong(0); 372 AtomicLong totalQueueWaitTime = new AtomicLong(0); 373 CountDownLatch latch = new CountDownLatch(numThreads); 374 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 375 TEST_UTIL.loadTable(table, CF); 376 for (int i = 0; i < numThreads; i++) { 377 executor.execute(new Runnable() { 378 @Override 379 public void run() { 380 try { 381 Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); 382 scan.setEnableScanMetricsByRegion(true); 383 scan.setCaching(2); 384 try (ResultScanner rs = table.getScanner(scan)) { 385 Result r; 386 while ((r = rs.next()) != null) { 387 Assert.assertFalse(r.isEmpty()); 388 } 389 ScanMetrics scanMetrics = rs.getScanMetrics(); 390 Map<String, Long> metricsMap = scanMetrics.getMetricsMap(); 391 totalScanRpcTime.addAndGet(metricsMap.get(RPC_SCAN_PROCESSING_TIME_METRIC_NAME)); 392 totalQueueWaitTime.addAndGet(metricsMap.get(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME)); 393 } 394 latch.countDown(); 395 } catch (IOException e) { 396 throw new RuntimeException(e); 397 } 398 } 399 }); 400 } 401 latch.await(); 402 executor.shutdown(); 403 executor.awaitTermination(10, TimeUnit.SECONDS); 404 Assert.assertTrue(totalScanRpcTime.get() > 0); 405 Assert.assertTrue(totalQueueWaitTime.get() > 0); 406 } finally { 407 TEST_UTIL.deleteTable(tableName); 408 } 409 } 410 411 @Test 412 public void testScanMetricsByRegionWithRegionMove() throws Exception { 413 TableName tableName = TableName.valueOf( 414 TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMove"); 415 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 416 TEST_UTIL.loadTable(table, CF); 417 418 // Scan 2 regions with start row keys: bbb and ccc 419 byte[] bbb = Bytes.toBytes("bbb"); 420 byte[] ccc = Bytes.toBytes("ccc"); 421 byte[] ddc = Bytes.toBytes("ddc"); 422 long expectedCountOfRowsScannedInMovedRegion = 0; 423 // ROWS is the data loaded by loadTable() 424 for (byte[] row : HBaseTestingUtil.ROWS) { 425 if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccc) < 0) { 426 expectedCountOfRowsScannedInMovedRegion++; 427 } 428 } 429 byte[] movedRegion = null; 430 ScanMetrics scanMetrics; 431 432 // Initialize scan with maxResultSize as size of 50 rows. 433 Scan scan = generateScan(bbb, ddc); 434 scan.setEnableScanMetricsByRegion(true); 435 scan.setMaxResultSize(8000); 436 437 try (ResultScanner rs = table.getScanner(scan)) { 438 boolean isFirstScanOfRegion = true; 439 for (Result r : rs) { 440 byte[] row = r.getRow(); 441 if (isFirstScanOfRegion) { 442 movedRegion = moveRegion(tableName, row); 443 isFirstScanOfRegion = false; 444 } 445 } 446 Assert.assertNotNull(movedRegion); 447 448 scanMetrics = rs.getScanMetrics(); 449 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 450 scanMetrics.collectMetricsByRegion(); 451 long actualCountOfRowsScannedInMovedRegion = 0; 452 Set<ServerName> serversForMovedRegion = new HashSet<>(); 453 454 // 2 regions scanned with two entries for first region as it moved in b/w scan 455 Assert.assertEquals(3, scanMetricsByRegion.size()); 456 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 457 .entrySet()) { 458 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 459 Map<String, Long> metricsMap = entry.getValue(); 460 if (scanMetricsRegionInfo.getEncodedRegionName().equals(Bytes.toString(movedRegion))) { 461 long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 462 actualCountOfRowsScannedInMovedRegion += rowsScanned; 463 serversForMovedRegion.add(scanMetricsRegionInfo.getServerName()); 464 465 Assert.assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME)); 466 } 467 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 468 } 469 Assert.assertEquals(expectedCountOfRowsScannedInMovedRegion, 470 actualCountOfRowsScannedInMovedRegion); 471 Assert.assertEquals(2, serversForMovedRegion.size()); 472 } 473 } finally { 474 TEST_UTIL.deleteTable(tableName); 475 } 476 } 477 478 @Test 479 public void testScanMetricsByRegionWithRegionSplit() throws Exception { 480 TableName tableName = TableName.valueOf( 481 TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionSplit"); 482 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 483 TEST_UTIL.loadTable(table, CF); 484 485 // Scan 1 region with start row key: bbb 486 byte[] bbb = Bytes.toBytes("bbb"); 487 byte[] bmw = Bytes.toBytes("bmw"); 488 byte[] ccb = Bytes.toBytes("ccb"); 489 long expectedCountOfRowsScannedInRegion = 0; 490 // ROWS is the data loaded by loadTable() 491 for (byte[] row : HBaseTestingUtil.ROWS) { 492 if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccb) <= 0) { 493 expectedCountOfRowsScannedInRegion++; 494 } 495 } 496 ScanMetrics scanMetrics; 497 Set<String> expectedSplitRegionRes = new HashSet<>(); 498 499 // Initialize scan 500 Scan scan = generateScan(bbb, ccb); 501 scan.setEnableScanMetricsByRegion(true); 502 scan.setMaxResultSize(8000); 503 504 try (ResultScanner rs = table.getScanner(scan)) { 505 boolean isFirstScanOfRegion = true; 506 for (Result r : rs) { 507 if (isFirstScanOfRegion) { 508 splitRegion(tableName, bbb, bmw) 509 .forEach(region -> expectedSplitRegionRes.add(Bytes.toString(region))); 510 isFirstScanOfRegion = false; 511 } 512 } 513 514 scanMetrics = rs.getScanMetrics(); 515 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 516 scanMetrics.collectMetricsByRegion(); 517 518 long actualCountOfRowsScannedInRegion = 0; 519 long rpcRetiesCount = 0; 520 Set<String> splitRegionRes = new HashSet<>(); 521 522 // 1 entry each for parent and two child regions 523 Assert.assertEquals(3, scanMetricsByRegion.size()); 524 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 525 .entrySet()) { 526 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 527 Map<String, Long> metricsMap = entry.getValue(); 528 long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 529 actualCountOfRowsScannedInRegion += rowsScanned; 530 splitRegionRes.add(scanMetricsRegionInfo.getEncodedRegionName()); 531 532 if (metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1) { 533 rpcRetiesCount++; 534 } 535 536 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 537 } 538 Assert.assertEquals(expectedCountOfRowsScannedInRegion, actualCountOfRowsScannedInRegion); 539 Assert.assertEquals(2, rpcRetiesCount); 540 Assert.assertEquals(expectedSplitRegionRes, splitRegionRes); 541 } 542 } finally { 543 TEST_UTIL.deleteTable(tableName); 544 } 545 } 546 547 @Test 548 public void testScanMetricsByRegionWithRegionMerge() throws Exception { 549 TableName tableName = TableName.valueOf( 550 TestTableScanMetrics.class.getSimpleName() + "testScanMetricsByRegionWithRegionMerge"); 551 try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) { 552 TEST_UTIL.loadTable(table, CF); 553 554 // Scan 2 regions with start row keys: bbb and ccc 555 byte[] bbb = Bytes.toBytes("bbb"); 556 byte[] ccc = Bytes.toBytes("ccc"); 557 byte[] ddc = Bytes.toBytes("ddc"); 558 long expectedCountOfRowsScannedInRegions = 0; 559 // ROWS is the data loaded by loadTable() 560 for (byte[] row : HBaseTestingUtil.ROWS) { 561 if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ddc) <= 0) { 562 expectedCountOfRowsScannedInRegions++; 563 } 564 } 565 ScanMetrics scanMetrics; 566 Set<String> expectedMergeRegionsRes = new HashSet<>(); 567 String mergedRegionEncodedName = null; 568 569 // Initialize scan 570 Scan scan = generateScan(bbb, ddc); 571 scan.setEnableScanMetricsByRegion(true); 572 scan.setMaxResultSize(8000); 573 574 try (ResultScanner rs = table.getScanner(scan)) { 575 boolean isFirstScanOfRegion = true; 576 for (Result r : rs) { 577 if (isFirstScanOfRegion) { 578 List<byte[]> out = mergeRegions(tableName, bbb, ccc); 579 // Entry with index 2 is the encoded region name of merged region 580 mergedRegionEncodedName = Bytes.toString(out.get(2)); 581 out.forEach(region -> expectedMergeRegionsRes.add(Bytes.toString(region))); 582 isFirstScanOfRegion = false; 583 } 584 } 585 586 scanMetrics = rs.getScanMetrics(); 587 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 588 scanMetrics.collectMetricsByRegion(); 589 long actualCountOfRowsScannedInRegions = 0; 590 Set<String> mergeRegionsRes = new HashSet<>(); 591 boolean containsMergedRegionInScanMetrics = false; 592 593 // 1 entry each for old region from which first row was scanned and new merged region 594 Assert.assertEquals(2, scanMetricsByRegion.size()); 595 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 596 .entrySet()) { 597 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 598 Map<String, Long> metricsMap = entry.getValue(); 599 long rowsScanned = metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME); 600 actualCountOfRowsScannedInRegions += rowsScanned; 601 mergeRegionsRes.add(scanMetricsRegionInfo.getEncodedRegionName()); 602 if (scanMetricsRegionInfo.getEncodedRegionName().equals(mergedRegionEncodedName)) { 603 containsMergedRegionInScanMetrics = true; 604 } 605 606 Assert.assertEquals(1, (long) metricsMap.get(RPC_RETRIES_METRIC_NAME)); 607 Assert.assertEquals(1, (long) metricsMap.get(REGIONS_SCANNED_METRIC_NAME)); 608 } 609 Assert.assertEquals(expectedCountOfRowsScannedInRegions, actualCountOfRowsScannedInRegions); 610 Assert.assertTrue(expectedMergeRegionsRes.containsAll(mergeRegionsRes)); 611 Assert.assertTrue(containsMergedRegionInScanMetrics); 612 } 613 } finally { 614 TEST_UTIL.deleteTable(tableName); 615 } 616 } 617 618 private Runnable getPeriodicScanMetricsCollector(ScanMetrics scanMetrics, 619 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegionCollection, 620 CountDownLatch latch) { 621 return new Runnable() { 622 public void run() { 623 try { 624 while (latch.getCount() > 0) { 625 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 626 scanMetrics.collectMetricsByRegion(); 627 mergeScanMetricsByRegion(scanMetricsByRegion, scanMetricsByRegionCollection); 628 Thread.sleep(RAND.nextInt(10)); 629 } 630 } catch (InterruptedException e) { 631 throw new RuntimeException(e); 632 } 633 } 634 }; 635 } 636 637 private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String, Long>> srcMap, 638 Map<ScanMetricsRegionInfo, Map<String, Long>> dstMap) { 639 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : srcMap.entrySet()) { 640 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 641 Map<String, Long> metricsMap = entry.getValue(); 642 metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME); 643 metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME); 644 if (dstMap.containsKey(scanMetricsRegionInfo)) { 645 Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo); 646 for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) { 647 String metricName = metricEntry.getKey(); 648 Long existingValue = dstMetricsMap.get(metricName); 649 Long newValue = metricEntry.getValue(); 650 dstMetricsMap.put(metricName, existingValue + newValue); 651 } 652 } else { 653 dstMap.put(scanMetricsRegionInfo, metricsMap); 654 } 655 } 656 } 657 658 /** 659 * Moves the region with start row key from its original region server to some other region 660 * server. This is a synchronous method. 661 * @param tableName Table name of region to be moved belongs. 662 * @param startRow Start row key of the region to be moved. 663 * @return Encoded region name of the region which was moved. 664 */ 665 private byte[] moveRegion(TableName tableName, byte[] startRow) throws IOException { 666 Admin admin = TEST_UTIL.getAdmin(); 667 RegionLocator regionLocator = CONN.getRegionLocator(tableName); 668 HRegionLocation loc = regionLocator.getRegionLocation(startRow, true); 669 byte[] encodedRegionName = loc.getRegion().getEncodedNameAsBytes(); 670 ServerName initialServerName = loc.getServerName(); 671 672 admin.move(encodedRegionName); 673 674 ServerName finalServerName = regionLocator.getRegionLocation(startRow, true).getServerName(); 675 676 // Assert that region actually moved 677 Assert.assertNotEquals(initialServerName, finalServerName); 678 return encodedRegionName; 679 } 680 681 /** 682 * Splits the region with start row key at the split key provided. This is a synchronous method. 683 * @param tableName Table name of region to be split. 684 * @param startRow Start row key of the region to be split. 685 * @param splitKey Split key for splitting the region. 686 * @return List of encoded region names with first element being parent region followed by two 687 * child regions. 688 */ 689 private List<byte[]> splitRegion(TableName tableName, byte[] startRow, byte[] splitKey) 690 throws IOException { 691 Admin admin = TEST_UTIL.getAdmin(); 692 RegionLocator regionLocator = CONN.getRegionLocator(tableName); 693 HRegionLocation topLoc = regionLocator.getRegionLocation(startRow, true); 694 byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 695 ServerName initialTopServerName = topLoc.getServerName(); 696 HRegionLocation bottomLoc = regionLocator.getRegionLocation(splitKey, true); 697 byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 698 ServerName initialBottomServerName = bottomLoc.getServerName(); 699 700 // Assert region is ready for split 701 Assert.assertEquals(initialTopServerName, initialBottomServerName); 702 Assert.assertEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName); 703 704 FutureUtils.get(admin.splitRegionAsync(initialEncodedTopRegionName, splitKey)); 705 706 topLoc = regionLocator.getRegionLocation(startRow, true); 707 byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 708 bottomLoc = regionLocator.getRegionLocation(splitKey, true); 709 byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 710 711 // Assert that region split is complete 712 Assert.assertNotEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName); 713 Assert.assertNotEquals(initialEncodedTopRegionName, finalEncodedBottomRegionName); 714 Assert.assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName); 715 716 return Arrays.asList(initialEncodedTopRegionName, finalEncodedTopRegionName, 717 finalEncodedBottomRegionName); 718 } 719 720 /** 721 * Merges two regions with the start row key as topRegion and bottomRegion. Ensures that the 722 * regions to be merged are adjacent regions. This is a synchronous method. 723 * @param tableName Table name of regions to be merged. 724 * @param topRegion Start row key of first region for merging. 725 * @param bottomRegion Start row key of second region for merging. 726 * @return List of encoded region names with first two elements being original regions followed by 727 * the merged region. 728 */ 729 private List<byte[]> mergeRegions(TableName tableName, byte[] topRegion, byte[] bottomRegion) 730 throws IOException { 731 Admin admin = TEST_UTIL.getAdmin(); 732 RegionLocator regionLocator = CONN.getRegionLocator(tableName); 733 HRegionLocation topLoc = regionLocator.getRegionLocation(topRegion, true); 734 byte[] initialEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 735 String initialTopRegionEndKey = Bytes.toString(topLoc.getRegion().getEndKey()); 736 HRegionLocation bottomLoc = regionLocator.getRegionLocation(bottomRegion, true); 737 byte[] initialEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 738 String initialBottomRegionStartKey = Bytes.toString(bottomLoc.getRegion().getStartKey()); 739 740 // Assert that regions are ready to be merged 741 Assert.assertNotEquals(initialEncodedTopRegionName, initialEncodedBottomRegionName); 742 Assert.assertEquals(initialBottomRegionStartKey, initialTopRegionEndKey); 743 744 FutureUtils.get(admin.mergeRegionsAsync( 745 new byte[][] { initialEncodedTopRegionName, initialEncodedBottomRegionName }, false)); 746 747 topLoc = regionLocator.getRegionLocation(topRegion, true); 748 byte[] finalEncodedTopRegionName = topLoc.getRegion().getEncodedNameAsBytes(); 749 bottomLoc = regionLocator.getRegionLocation(bottomRegion, true); 750 byte[] finalEncodedBottomRegionName = bottomLoc.getRegion().getEncodedNameAsBytes(); 751 752 // Assert regions have been merges successfully 753 Assert.assertEquals(finalEncodedTopRegionName, finalEncodedBottomRegionName); 754 Assert.assertNotEquals(initialEncodedTopRegionName, finalEncodedTopRegionName); 755 Assert.assertNotEquals(initialEncodedBottomRegionName, finalEncodedTopRegionName); 756 757 return Arrays.asList(initialEncodedTopRegionName, initialEncodedBottomRegionName, 758 finalEncodedTopRegionName); 759 } 760}