001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.fail; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.EnumSet; 026import java.util.HashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.ClusterMetrics.Option; 033import org.apache.hadoop.hbase.CompareOperator; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.RegionMetrics; 036import org.apache.hadoop.hbase.ServerMetrics; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.Append; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Increment; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.RowMutations; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.Table; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.coprocessor.ObserverContext; 054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 056import org.apache.hadoop.hbase.coprocessor.RegionObserver; 057import org.apache.hadoop.hbase.filter.BinaryComparator; 058import org.apache.hadoop.hbase.filter.RowFilter; 059import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 060import org.apache.hadoop.hbase.testclassification.MediumTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.junit.jupiter.api.AfterAll; 063import org.junit.jupiter.api.BeforeAll; 064import org.junit.jupiter.api.Disabled; 065import org.junit.jupiter.api.Tag; 066import org.junit.jupiter.api.Test; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070@Disabled // Depends on Master being able to host regions. Needs fixing. 071@Tag(MediumTests.TAG) 072public class TestRegionServerReadRequestMetrics { 073 074 private static final Logger LOG = 075 LoggerFactory.getLogger(TestRegionServerReadRequestMetrics.class); 076 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 077 private static final TableName TABLE_NAME = TableName.valueOf("test"); 078 private static final byte[] CF1 = Bytes.toBytes("c1"); 079 private static final byte[] CF2 = Bytes.toBytes("c2"); 080 081 private static final byte[] ROW1 = Bytes.toBytes("a"); 082 private static final byte[] ROW2 = Bytes.toBytes("b"); 083 private static final byte[] ROW3 = Bytes.toBytes("c"); 084 private static final byte[] COL1 = Bytes.toBytes("q1"); 085 private static final byte[] COL2 = Bytes.toBytes("q2"); 086 private static final byte[] COL3 = Bytes.toBytes("q3"); 087 private static final byte[] VAL1 = Bytes.toBytes("v1"); 088 private static final byte[] VAL2 = Bytes.toBytes("v2"); 089 private static final byte[] VAL3 = Bytes.toBytes(0L); 090 091 private static final int MAX_TRY = 20; 092 private static final int SLEEP_MS = 100; 093 private static final int TTL = 1; 094 095 private static Admin admin; 096 private static Collection<ServerName> serverNames; 097 private static Table table; 098 private static RegionInfo regionInfo; 099 100 private static Map<Metric, Long> requestsMap = new HashMap<>(); 101 private static Map<Metric, Long> requestsMapPrev = new HashMap<>(); 102 103 @BeforeAll 104 public static void setUpOnce() throws Exception { 105 TEST_UTIL.startMiniCluster(); 106 admin = TEST_UTIL.getAdmin(); 107 serverNames = 108 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().keySet(); 109 table = createTable(); 110 putData(); 111 List<RegionInfo> regions = admin.getRegions(TABLE_NAME); 112 assertEquals(1, regions.size(), "Table " + TABLE_NAME + " should have 1 region"); 113 regionInfo = regions.get(0); 114 115 for (Metric metric : Metric.values()) { 116 requestsMap.put(metric, 0L); 117 requestsMapPrev.put(metric, 0L); 118 } 119 } 120 121 private static Table createTable() throws IOException { 122 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); 123 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)); 124 builder 125 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL).build()); 126 admin.createTable(builder.build()); 127 return TEST_UTIL.getConnection().getTable(TABLE_NAME); 128 } 129 130 private static void testReadRequests(long resultCount, long expectedReadRequests, 131 long expectedFilteredReadRequests) throws IOException, InterruptedException { 132 updateMetricsMap(); 133 System.out.println("requestsMapPrev = " + requestsMapPrev); 134 System.out.println("requestsMap = " + requestsMap); 135 136 assertEquals(expectedReadRequests, 137 requestsMap.get(Metric.REGION_READ) - requestsMapPrev.get(Metric.REGION_READ)); 138 assertEquals(expectedFilteredReadRequests, requestsMap.get(Metric.FILTERED_REGION_READ) 139 - requestsMapPrev.get(Metric.FILTERED_REGION_READ)); 140 assertEquals(expectedFilteredReadRequests, requestsMap.get(Metric.FILTERED_SERVER_READ) 141 - requestsMapPrev.get(Metric.FILTERED_SERVER_READ)); 142 assertEquals(expectedReadRequests, resultCount); 143 } 144 145 private static void updateMetricsMap() throws IOException, InterruptedException { 146 for (Metric metric : Metric.values()) { 147 requestsMapPrev.put(metric, requestsMap.get(metric)); 148 } 149 150 ServerMetrics serverMetrics = null; 151 RegionMetrics regionMetricsOuter = null; 152 boolean metricsUpdated = false; 153 for (int i = 0; i < MAX_TRY; i++) { 154 for (ServerName serverName : serverNames) { 155 serverMetrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 156 .getLiveServerMetrics().get(serverName); 157 158 Map<byte[], RegionMetrics> regionMetrics = serverMetrics.getRegionMetrics(); 159 RegionMetrics regionMetric = regionMetrics.get(regionInfo.getRegionName()); 160 if (regionMetric != null) { 161 regionMetricsOuter = regionMetric; 162 for (Metric metric : Metric.values()) { 163 if (getReadRequest(serverMetrics, regionMetric, metric) > requestsMapPrev.get(metric)) { 164 for (Metric metricInner : Metric.values()) { 165 requestsMap.put(metricInner, 166 getReadRequest(serverMetrics, regionMetric, metricInner)); 167 } 168 metricsUpdated = true; 169 break; 170 } 171 } 172 } 173 } 174 if (metricsUpdated) { 175 break; 176 } 177 Thread.sleep(SLEEP_MS); 178 } 179 if (!metricsUpdated) { 180 for (Metric metric : Metric.values()) { 181 requestsMap.put(metric, getReadRequest(serverMetrics, regionMetricsOuter, metric)); 182 } 183 } 184 } 185 186 private static long getReadRequest(ServerMetrics serverMetrics, RegionMetrics regionMetrics, 187 Metric metric) { 188 switch (metric) { 189 case REGION_READ: 190 return regionMetrics.getReadRequestCount(); 191 case SERVER_READ: 192 return serverMetrics.getRegionMetrics().get(regionMetrics.getRegionName()) 193 .getReadRequestCount(); 194 case FILTERED_REGION_READ: 195 return regionMetrics.getFilteredReadRequestCount(); 196 case FILTERED_SERVER_READ: 197 return serverMetrics.getRegionMetrics().get(regionMetrics.getRegionName()) 198 .getFilteredReadRequestCount(); 199 default: 200 throw new IllegalStateException(); 201 } 202 } 203 204 private static void putData() throws IOException { 205 Put put; 206 207 put = new Put(ROW1); 208 put.addColumn(CF1, COL1, VAL1); 209 put.addColumn(CF1, COL2, VAL2); 210 put.addColumn(CF1, COL3, VAL3); 211 table.put(put); 212 put = new Put(ROW2); 213 put.addColumn(CF1, COL1, VAL2); // put val2 instead of val1 214 put.addColumn(CF1, COL2, VAL2); 215 table.put(put); 216 put = new Put(ROW3); 217 put.addColumn(CF1, COL1, VAL1); 218 put.addColumn(CF1, COL2, VAL2); 219 table.put(put); 220 } 221 222 private static void putTTLExpiredData() throws IOException, InterruptedException { 223 Put put; 224 225 put = new Put(ROW1); 226 put.addColumn(CF2, COL1, VAL1); 227 put.addColumn(CF2, COL2, VAL2); 228 table.put(put); 229 230 Thread.sleep(TTL * 1000); 231 232 put = new Put(ROW2); 233 put.addColumn(CF2, COL1, VAL1); 234 put.addColumn(CF2, COL2, VAL2); 235 table.put(put); 236 237 put = new Put(ROW3); 238 put.addColumn(CF2, COL1, VAL1); 239 put.addColumn(CF2, COL2, VAL2); 240 table.put(put); 241 } 242 243 @AfterAll 244 public static void tearDownOnce() throws Exception { 245 TEST_UTIL.shutdownMiniCluster(); 246 } 247 248 @Test 249 public void testReadRequestsCountNotFiltered() throws Exception { 250 int resultCount; 251 Scan scan; 252 Append append; 253 Put put; 254 Increment increment; 255 Get get; 256 257 // test for scan 258 scan = new Scan(); 259 try (ResultScanner scanner = table.getScanner(scan)) { 260 resultCount = 0; 261 for (Result ignore : scanner) { 262 resultCount++; 263 } 264 testReadRequests(resultCount, 3, 0); 265 } 266 267 // test for scan 268 scan = new Scan().withStartRow(ROW2).withStopRow(ROW3); 269 try (ResultScanner scanner = table.getScanner(scan)) { 270 resultCount = 0; 271 for (Result ignore : scanner) { 272 resultCount++; 273 } 274 testReadRequests(resultCount, 1, 0); 275 } 276 277 // test for get 278 get = new Get(ROW2); 279 Result result = table.get(get); 280 resultCount = result.isEmpty() ? 0 : 1; 281 testReadRequests(resultCount, 1, 0); 282 283 // test for increment 284 increment = new Increment(ROW1); 285 increment.addColumn(CF1, COL3, 1); 286 result = table.increment(increment); 287 resultCount = result.isEmpty() ? 0 : 1; 288 testReadRequests(resultCount, 1, 0); 289 290 // test for checkAndPut 291 put = new Put(ROW1); 292 put.addColumn(CF1, COL2, VAL2); 293 boolean checkAndPut = 294 table.checkAndMutate(ROW1, CF1).qualifier(COL2).ifEquals(VAL2).thenPut(put); 295 resultCount = checkAndPut ? 1 : 0; 296 testReadRequests(resultCount, 1, 0); 297 298 // test for append 299 append = new Append(ROW1); 300 append.addColumn(CF1, COL2, VAL2); 301 result = table.append(append); 302 resultCount = result.isEmpty() ? 0 : 1; 303 testReadRequests(resultCount, 1, 0); 304 305 // test for checkAndMutate 306 put = new Put(ROW1); 307 put.addColumn(CF1, COL1, VAL1); 308 RowMutations rm = new RowMutations(ROW1); 309 rm.add(put); 310 boolean checkAndMutate = 311 table.checkAndMutate(ROW1, CF1).qualifier(COL1).ifEquals(VAL1).thenMutate(rm); 312 resultCount = checkAndMutate ? 1 : 0; 313 testReadRequests(resultCount, 1, 0); 314 } 315 316 @Disabled // HBASE-19785 317 @Test 318 public void testReadRequestsCountWithFilter() throws Exception { 319 int resultCount; 320 Scan scan; 321 322 // test for scan 323 scan = new Scan(); 324 scan.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareOperator.EQUAL, VAL1)); 325 try (ResultScanner scanner = table.getScanner(scan)) { 326 resultCount = 0; 327 for (Result ignore : scanner) { 328 resultCount++; 329 } 330 testReadRequests(resultCount, 2, 1); 331 } 332 333 // test for scan 334 scan = new Scan(); 335 scan.setFilter(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW1))); 336 try (ResultScanner scanner = table.getScanner(scan)) { 337 resultCount = 0; 338 for (Result ignore : scanner) { 339 resultCount++; 340 } 341 testReadRequests(resultCount, 1, 2); 342 } 343 344 // test for scan 345 scan = new Scan().withStartRow(ROW2).withStopRow(ROW3); 346 scan.setFilter(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW1))); 347 try (ResultScanner scanner = table.getScanner(scan)) { 348 resultCount = 0; 349 for (Result ignore : scanner) { 350 resultCount++; 351 } 352 testReadRequests(resultCount, 0, 1); 353 } 354 355 // fixme filtered get should not increase readRequestsCount 356 // Get get = new Get(ROW2); 357 // get.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1)); 358 // Result result = table.get(get); 359 // resultCount = result.isEmpty() ? 0 : 1; 360 // testReadRequests(resultCount, 0, 1); 361 } 362 363 @Disabled // HBASE-19785 364 @Test 365 public void testReadRequestsCountWithDeletedRow() throws Exception { 366 try { 367 Delete delete = new Delete(ROW3); 368 table.delete(delete); 369 370 Scan scan = new Scan(); 371 try (ResultScanner scanner = table.getScanner(scan)) { 372 int resultCount = 0; 373 for (Result ignore : scanner) { 374 resultCount++; 375 } 376 testReadRequests(resultCount, 2, 1); 377 } 378 } finally { 379 Put put = new Put(ROW3); 380 put.addColumn(CF1, COL1, VAL1); 381 put.addColumn(CF1, COL2, VAL2); 382 table.put(put); 383 } 384 } 385 386 @Test 387 public void testReadRequestsCountWithTTLExpiration() throws Exception { 388 putTTLExpiredData(); 389 390 Scan scan = new Scan(); 391 scan.addFamily(CF2); 392 try (ResultScanner scanner = table.getScanner(scan)) { 393 int resultCount = 0; 394 for (Result ignore : scanner) { 395 resultCount++; 396 } 397 testReadRequests(resultCount, 2, 1); 398 } 399 } 400 401 @Disabled // See HBASE-19785 402 @Test 403 public void testReadRequestsWithCoprocessor() throws Exception { 404 TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor"); 405 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 406 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)); 407 builder.setCoprocessor(ScanRegionCoprocessor.class.getName()); 408 admin.createTable(builder.build()); 409 410 try { 411 TEST_UTIL.waitTableAvailable(tableName); 412 List<RegionInfo> regionInfos = admin.getRegions(tableName); 413 assertEquals(1, regionInfos.size(), "Table " + TABLE_NAME + " should have 1 region"); 414 boolean success = true; 415 int i = 0; 416 for (; i < MAX_TRY; i++) { 417 try { 418 testReadRequests(regionInfos.get(0).getRegionName(), 3); 419 } catch (Throwable t) { 420 LOG.warn("Got exception when try " + i + " times", t); 421 Thread.sleep(SLEEP_MS); 422 success = false; 423 } 424 if (success) { 425 break; 426 } 427 } 428 if (i == MAX_TRY) { 429 fail("Failed to get right read requests metric after try " + i + " times"); 430 } 431 } finally { 432 admin.disableTable(tableName); 433 admin.deleteTable(tableName); 434 } 435 } 436 437 private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception { 438 for (ServerName serverName : serverNames) { 439 ServerMetrics serverMetrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 440 .getLiveServerMetrics().get(serverName); 441 Map<byte[], RegionMetrics> regionMetrics = serverMetrics.getRegionMetrics(); 442 RegionMetrics regionMetric = regionMetrics.get(regionName); 443 if (regionMetric != null) { 444 LOG.debug("server read request is " 445 + serverMetrics.getRegionMetrics().get(regionName).getReadRequestCount() 446 + ", region read request is " + regionMetric.getReadRequestCount()); 447 assertEquals(3, serverMetrics.getRegionMetrics().get(regionName).getReadRequestCount()); 448 assertEquals(3, regionMetric.getReadRequestCount()); 449 } 450 } 451 } 452 453 public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver { 454 @Override 455 public Optional<RegionObserver> getRegionObserver() { 456 return Optional.of(this); 457 } 458 459 @Override 460 public void postOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c) { 461 RegionCoprocessorEnvironment env = c.getEnvironment(); 462 Region region = env.getRegion(); 463 try { 464 putData(region); 465 RegionScanner scanner = region.getScanner(new Scan()); 466 List<Cell> result = new LinkedList<>(); 467 while (scanner.next(result)) { 468 result.clear(); 469 } 470 } catch (Exception e) { 471 LOG.warn("Got exception in coprocessor", e); 472 } 473 } 474 475 private void putData(Region region) throws Exception { 476 Put put = new Put(ROW1); 477 put.addColumn(CF1, COL1, VAL1); 478 region.put(put); 479 put = new Put(ROW2); 480 put.addColumn(CF1, COL1, VAL1); 481 region.put(put); 482 put = new Put(ROW3); 483 put.addColumn(CF1, COL1, VAL1); 484 region.put(put); 485 } 486 } 487 488 private enum Metric { 489 REGION_READ, 490 SERVER_READ, 491 FILTERED_REGION_READ, 492 FILTERED_SERVER_READ 493 } 494}