001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.EnumSet; 026import java.util.HashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.ClusterMetrics.Option; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.RegionLoad; 036import org.apache.hadoop.hbase.ServerLoad; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.Append; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Increment; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.RowMutations; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.Table; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.coprocessor.ObserverContext; 054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 056import org.apache.hadoop.hbase.coprocessor.RegionObserver; 057import org.apache.hadoop.hbase.filter.BinaryComparator; 058import org.apache.hadoop.hbase.filter.CompareFilter; 059import org.apache.hadoop.hbase.filter.RowFilter; 060import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 061import org.apache.hadoop.hbase.master.LoadBalancer; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.junit.AfterClass; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Ignore; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073@Ignore // Depends on Master being able to host regions. Needs fixing. 074@Category(MediumTests.class) 075public class TestRegionServerReadRequestMetrics { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestRegionServerReadRequestMetrics.class); 080 081 private static final Logger LOG = 082 LoggerFactory.getLogger(TestRegionServerReadRequestMetrics.class); 083 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 084 private static final TableName TABLE_NAME = TableName.valueOf("test"); 085 private static final byte[] CF1 = "c1".getBytes(); 086 private static final byte[] CF2 = "c2".getBytes(); 087 088 private static final byte[] ROW1 = "a".getBytes(); 089 private static final byte[] ROW2 = "b".getBytes(); 090 private static final byte[] ROW3 = "c".getBytes(); 091 private static final byte[] COL1 = "q1".getBytes(); 092 private static final byte[] COL2 = "q2".getBytes(); 093 private static final byte[] COL3 = "q3".getBytes(); 094 private static final byte[] VAL1 = "v1".getBytes(); 095 private static final byte[] VAL2 = "v2".getBytes(); 096 private static final byte[] VAL3 = Bytes.toBytes(0L); 097 098 private static final int MAX_TRY = 20; 099 private static final int SLEEP_MS = 100; 100 private static final int TTL = 1; 101 102 private static Admin admin; 103 private static Collection<ServerName> serverNames; 104 private static Table table; 105 private static RegionInfo regionInfo; 106 107 private static Map<Metric, Long> requestsMap = new HashMap<>(); 108 private static Map<Metric, Long> requestsMapPrev = new HashMap<>(); 109 110 @BeforeClass 111 public static void setUpOnce() throws Exception { 112 // Default starts one regionserver only. 113 TEST_UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true); 114 // TEST_UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true); 115 TEST_UTIL.startMiniCluster(); 116 admin = TEST_UTIL.getAdmin(); 117 serverNames = 118 admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().keySet(); 119 table = createTable(); 120 putData(); 121 List<RegionInfo> regions = admin.getRegions(TABLE_NAME); 122 assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regions.size()); 123 regionInfo = regions.get(0); 124 125 for (Metric metric : Metric.values()) { 126 requestsMap.put(metric, 0L); 127 requestsMapPrev.put(metric, 0L); 128 } 129 } 130 131 private static Table createTable() throws IOException { 132 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); 133 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)); 134 builder 135 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL).build()); 136 admin.createTable(builder.build()); 137 return TEST_UTIL.getConnection().getTable(TABLE_NAME); 138 } 139 140 private static void testReadRequests(long resultCount, long expectedReadRequests, 141 long expectedFilteredReadRequests) throws IOException, InterruptedException { 142 updateMetricsMap(); 143 System.out.println("requestsMapPrev = " + requestsMapPrev); 144 System.out.println("requestsMap = " + requestsMap); 145 146 assertEquals(expectedReadRequests, 147 requestsMap.get(Metric.REGION_READ) - requestsMapPrev.get(Metric.REGION_READ)); 148 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(TEST_UTIL.getConfiguration()); 149 if (tablesOnMaster) { 150 // If NO tables on master, then the single regionserver in this test carries user-space 151 // tables and the meta table. The first time through, the read will be inflated by meta 152 // lookups. We don't know which test will be first through since junit randomizes. This 153 // method is used by a bunch of tests. Just do this check if master is hosting (system) 154 // regions only. 155 assertEquals(expectedReadRequests, 156 requestsMap.get(Metric.SERVER_READ) - requestsMapPrev.get(Metric.SERVER_READ)); 157 } 158 assertEquals(expectedFilteredReadRequests, requestsMap.get(Metric.FILTERED_REGION_READ) 159 - requestsMapPrev.get(Metric.FILTERED_REGION_READ)); 160 assertEquals(expectedFilteredReadRequests, requestsMap.get(Metric.FILTERED_SERVER_READ) 161 - requestsMapPrev.get(Metric.FILTERED_SERVER_READ)); 162 assertEquals(expectedReadRequests, resultCount); 163 } 164 165 private static void updateMetricsMap() throws IOException, InterruptedException { 166 for (Metric metric : Metric.values()) { 167 requestsMapPrev.put(metric, requestsMap.get(metric)); 168 } 169 170 ServerLoad serverLoad = null; 171 RegionLoad regionLoadOuter = null; 172 boolean metricsUpdated = false; 173 for (int i = 0; i < MAX_TRY; i++) { 174 for (ServerName serverName : serverNames) { 175 serverLoad = new ServerLoad(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 176 .getLiveServerMetrics().get(serverName)); 177 178 Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad(); 179 RegionLoad regionLoad = regionsLoad.get(regionInfo.getRegionName()); 180 if (regionLoad != null) { 181 regionLoadOuter = regionLoad; 182 for (Metric metric : Metric.values()) { 183 if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) { 184 for (Metric metricInner : Metric.values()) { 185 requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner)); 186 } 187 metricsUpdated = true; 188 break; 189 } 190 } 191 } 192 } 193 if (metricsUpdated) { 194 break; 195 } 196 Thread.sleep(SLEEP_MS); 197 } 198 if (!metricsUpdated) { 199 for (Metric metric : Metric.values()) { 200 requestsMap.put(metric, getReadRequest(serverLoad, regionLoadOuter, metric)); 201 } 202 } 203 } 204 205 private static long getReadRequest(ServerLoad serverLoad, RegionLoad regionLoad, Metric metric) { 206 switch (metric) { 207 case REGION_READ: 208 return regionLoad.getReadRequestsCount(); 209 case SERVER_READ: 210 return serverLoad.getReadRequestsCount(); 211 case FILTERED_REGION_READ: 212 return regionLoad.getFilteredReadRequestsCount(); 213 case FILTERED_SERVER_READ: 214 return serverLoad.getFilteredReadRequestsCount(); 215 default: 216 throw new IllegalStateException(); 217 } 218 } 219 220 private static void putData() throws IOException { 221 Put put; 222 223 put = new Put(ROW1); 224 put.addColumn(CF1, COL1, VAL1); 225 put.addColumn(CF1, COL2, VAL2); 226 put.addColumn(CF1, COL3, VAL3); 227 table.put(put); 228 put = new Put(ROW2); 229 put.addColumn(CF1, COL1, VAL2); // put val2 instead of val1 230 put.addColumn(CF1, COL2, VAL2); 231 table.put(put); 232 put = new Put(ROW3); 233 put.addColumn(CF1, COL1, VAL1); 234 put.addColumn(CF1, COL2, VAL2); 235 table.put(put); 236 } 237 238 private static void putTTLExpiredData() throws IOException, InterruptedException { 239 Put put; 240 241 put = new Put(ROW1); 242 put.addColumn(CF2, COL1, VAL1); 243 put.addColumn(CF2, COL2, VAL2); 244 table.put(put); 245 246 Thread.sleep(TTL * 1000); 247 248 put = new Put(ROW2); 249 put.addColumn(CF2, COL1, VAL1); 250 put.addColumn(CF2, COL2, VAL2); 251 table.put(put); 252 253 put = new Put(ROW3); 254 put.addColumn(CF2, COL1, VAL1); 255 put.addColumn(CF2, COL2, VAL2); 256 table.put(put); 257 } 258 259 @AfterClass 260 public static void tearDownOnce() throws Exception { 261 TEST_UTIL.shutdownMiniCluster(); 262 } 263 264 @Test 265 public void testReadRequestsCountNotFiltered() throws Exception { 266 int resultCount; 267 Scan scan; 268 Append append; 269 Put put; 270 Increment increment; 271 Get get; 272 273 // test for scan 274 scan = new Scan(); 275 try (ResultScanner scanner = table.getScanner(scan)) { 276 resultCount = 0; 277 for (Result ignore : scanner) { 278 resultCount++; 279 } 280 testReadRequests(resultCount, 3, 0); 281 } 282 283 // test for scan 284 scan = new Scan(ROW2, ROW3); 285 try (ResultScanner scanner = table.getScanner(scan)) { 286 resultCount = 0; 287 for (Result ignore : scanner) { 288 resultCount++; 289 } 290 testReadRequests(resultCount, 1, 0); 291 } 292 293 // test for get 294 get = new Get(ROW2); 295 Result result = table.get(get); 296 resultCount = result.isEmpty() ? 0 : 1; 297 testReadRequests(resultCount, 1, 0); 298 299 // test for increment 300 increment = new Increment(ROW1); 301 increment.addColumn(CF1, COL3, 1); 302 result = table.increment(increment); 303 resultCount = result.isEmpty() ? 0 : 1; 304 testReadRequests(resultCount, 1, 0); 305 306 // test for checkAndPut 307 put = new Put(ROW1); 308 put.addColumn(CF1, COL2, VAL2); 309 boolean checkAndPut = 310 table.checkAndMutate(ROW1, CF1).qualifier(COL2).ifEquals(VAL2).thenPut(put); 311 resultCount = checkAndPut ? 1 : 0; 312 testReadRequests(resultCount, 1, 0); 313 314 // test for append 315 append = new Append(ROW1); 316 append.addColumn(CF1, COL2, VAL2); 317 result = table.append(append); 318 resultCount = result.isEmpty() ? 0 : 1; 319 testReadRequests(resultCount, 1, 0); 320 321 // test for checkAndMutate 322 put = new Put(ROW1); 323 put.addColumn(CF1, COL1, VAL1); 324 RowMutations rm = new RowMutations(ROW1); 325 rm.add(put); 326 boolean checkAndMutate = 327 table.checkAndMutate(ROW1, CF1).qualifier(COL1).ifEquals(VAL1).thenMutate(rm); 328 resultCount = checkAndMutate ? 1 : 0; 329 testReadRequests(resultCount, 1, 0); 330 } 331 332 @Ignore // HBASE-19785 333 @Test 334 public void testReadRequestsCountWithFilter() throws Exception { 335 int resultCount; 336 Scan scan; 337 338 // test for scan 339 scan = new Scan(); 340 scan.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1)); 341 try (ResultScanner scanner = table.getScanner(scan)) { 342 resultCount = 0; 343 for (Result ignore : scanner) { 344 resultCount++; 345 } 346 testReadRequests(resultCount, 2, 1); 347 } 348 349 // test for scan 350 scan = new Scan(); 351 scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(ROW1))); 352 try (ResultScanner scanner = table.getScanner(scan)) { 353 resultCount = 0; 354 for (Result ignore : scanner) { 355 resultCount++; 356 } 357 testReadRequests(resultCount, 1, 2); 358 } 359 360 // test for scan 361 scan = new Scan(ROW2, ROW3); 362 scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(ROW1))); 363 try (ResultScanner scanner = table.getScanner(scan)) { 364 resultCount = 0; 365 for (Result ignore : scanner) { 366 resultCount++; 367 } 368 testReadRequests(resultCount, 0, 1); 369 } 370 371 // fixme filtered get should not increase readRequestsCount 372 // Get get = new Get(ROW2); 373 // get.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1)); 374 // Result result = table.get(get); 375 // resultCount = result.isEmpty() ? 0 : 1; 376 // testReadRequests(resultCount, 0, 1); 377 } 378 379 @Ignore // HBASE-19785 380 @Test 381 public void testReadRequestsCountWithDeletedRow() throws Exception { 382 try { 383 Delete delete = new Delete(ROW3); 384 table.delete(delete); 385 386 Scan scan = new Scan(); 387 try (ResultScanner scanner = table.getScanner(scan)) { 388 int resultCount = 0; 389 for (Result ignore : scanner) { 390 resultCount++; 391 } 392 testReadRequests(resultCount, 2, 1); 393 } 394 } finally { 395 Put put = new Put(ROW3); 396 put.addColumn(CF1, COL1, VAL1); 397 put.addColumn(CF1, COL2, VAL2); 398 table.put(put); 399 } 400 } 401 402 @Test 403 public void testReadRequestsCountWithTTLExpiration() throws Exception { 404 putTTLExpiredData(); 405 406 Scan scan = new Scan(); 407 scan.addFamily(CF2); 408 try (ResultScanner scanner = table.getScanner(scan)) { 409 int resultCount = 0; 410 for (Result ignore : scanner) { 411 resultCount++; 412 } 413 testReadRequests(resultCount, 2, 1); 414 } 415 } 416 417 @Ignore // See HBASE-19785 418 @Test 419 public void testReadRequestsWithCoprocessor() throws Exception { 420 TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor"); 421 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 422 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)); 423 builder.setCoprocessor(ScanRegionCoprocessor.class.getName()); 424 admin.createTable(builder.build()); 425 426 try { 427 TEST_UTIL.waitTableAvailable(tableName); 428 List<RegionInfo> regionInfos = admin.getRegions(tableName); 429 assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regionInfos.size()); 430 boolean success = true; 431 int i = 0; 432 for (; i < MAX_TRY; i++) { 433 try { 434 testReadRequests(regionInfos.get(0).getRegionName(), 3); 435 } catch (Throwable t) { 436 LOG.warn("Got exception when try " + i + " times", t); 437 Thread.sleep(SLEEP_MS); 438 success = false; 439 } 440 if (success) { 441 break; 442 } 443 } 444 if (i == MAX_TRY) { 445 fail("Failed to get right read requests metric after try " + i + " times"); 446 } 447 } finally { 448 admin.disableTable(tableName); 449 admin.deleteTable(tableName); 450 } 451 } 452 453 private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception { 454 for (ServerName serverName : serverNames) { 455 ServerLoad serverLoad = new ServerLoad(admin 456 .getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().get(serverName)); 457 Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad(); 458 RegionLoad regionLoad = regionsLoad.get(regionName); 459 if (regionLoad != null) { 460 LOG.debug("server read request is " + serverLoad.getReadRequestsCount() 461 + ", region read request is " + regionLoad.getReadRequestsCount()); 462 assertEquals(3, serverLoad.getReadRequestsCount()); 463 assertEquals(3, regionLoad.getReadRequestsCount()); 464 } 465 } 466 } 467 468 public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver { 469 @Override 470 public Optional<RegionObserver> getRegionObserver() { 471 return Optional.of(this); 472 } 473 474 @Override 475 public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) { 476 RegionCoprocessorEnvironment env = c.getEnvironment(); 477 Region region = env.getRegion(); 478 try { 479 putData(region); 480 RegionScanner scanner = region.getScanner(new Scan()); 481 List<Cell> result = new LinkedList<>(); 482 while (scanner.next(result)) { 483 result.clear(); 484 } 485 } catch (Exception e) { 486 LOG.warn("Got exception in coprocessor", e); 487 } 488 } 489 490 private void putData(Region region) throws Exception { 491 Put put = new Put(ROW1); 492 put.addColumn(CF1, COL1, VAL1); 493 region.put(put); 494 put = new Put(ROW2); 495 put.addColumn(CF1, COL1, VAL1); 496 region.put(put); 497 put = new Put(ROW3); 498 put.addColumn(CF1, COL1, VAL1); 499 region.put(put); 500 } 501 } 502 503 private enum Metric { 504 REGION_READ, 505 SERVER_READ, 506 FILTERED_REGION_READ, 507 FILTERED_SERVER_READ 508 } 509}