001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.EnumSet; 026import java.util.HashMap; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.ClusterMetrics.Option; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.RegionLoad; 036import org.apache.hadoop.hbase.ServerLoad; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.Append; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Increment; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.ResultScanner; 049import org.apache.hadoop.hbase.client.RowMutations; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.Table; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.coprocessor.ObserverContext; 054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 056import org.apache.hadoop.hbase.coprocessor.RegionObserver; 057import org.apache.hadoop.hbase.filter.BinaryComparator; 058import org.apache.hadoop.hbase.filter.CompareFilter; 059import org.apache.hadoop.hbase.filter.RowFilter; 060import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 061import org.apache.hadoop.hbase.master.LoadBalancer; 062import org.apache.hadoop.hbase.testclassification.MediumTests; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.junit.AfterClass; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Ignore; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073@Ignore // Depends on Master being able to host regions. Needs fixing. 074@Category(MediumTests.class) 075public class TestRegionServerReadRequestMetrics { 076 077 @ClassRule 078 public static final HBaseClassTestRule CLASS_RULE = 079 HBaseClassTestRule.forClass(TestRegionServerReadRequestMetrics.class); 080 081 private static final Logger LOG = 082 LoggerFactory.getLogger(TestRegionServerReadRequestMetrics.class); 083 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 084 private static final TableName TABLE_NAME = TableName.valueOf("test"); 085 private static final byte[] CF1 = "c1".getBytes(); 086 private static final byte[] CF2 = "c2".getBytes(); 087 088 private static final byte[] ROW1 = "a".getBytes(); 089 private static final byte[] ROW2 = "b".getBytes(); 090 private static final byte[] ROW3 = "c".getBytes(); 091 private static final byte[] COL1 = "q1".getBytes(); 092 private static final byte[] COL2 = "q2".getBytes(); 093 private static final byte[] COL3 = "q3".getBytes(); 094 private static final byte[] VAL1 = "v1".getBytes(); 095 private static final byte[] VAL2 = "v2".getBytes(); 096 private static final byte[] VAL3 = Bytes.toBytes(0L); 097 098 private static final int MAX_TRY = 20; 099 private static final int SLEEP_MS = 100; 100 private static final int TTL = 1; 101 102 private static Admin admin; 103 private static Collection<ServerName> serverNames; 104 private static Table table; 105 private static RegionInfo regionInfo; 106 107 private static Map<Metric, Long> requestsMap = new HashMap<>(); 108 private static Map<Metric, Long> requestsMapPrev = new HashMap<>(); 109 110 @BeforeClass 111 public static void setUpOnce() throws Exception { 112 // Default starts one regionserver only. 113 TEST_UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true); 114 // TEST_UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true); 115 TEST_UTIL.startMiniCluster(); 116 admin = TEST_UTIL.getAdmin(); 117 serverNames = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 118 .getLiveServerMetrics().keySet(); 119 table = createTable(); 120 putData(); 121 List<RegionInfo> regions = admin.getRegions(TABLE_NAME); 122 assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regions.size()); 123 regionInfo = regions.get(0); 124 125 for (Metric metric : Metric.values()) { 126 requestsMap.put(metric, 0L); 127 requestsMapPrev.put(metric, 0L); 128 } 129 } 130 131 private static Table createTable() throws IOException { 132 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME); 133 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)); 134 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL) 135 .build()); 136 admin.createTable(builder.build()); 137 return TEST_UTIL.getConnection().getTable(TABLE_NAME); 138 } 139 140 private static void testReadRequests(long resultCount, 141 long expectedReadRequests, long expectedFilteredReadRequests) 142 throws IOException, InterruptedException { 143 updateMetricsMap(); 144 System.out.println("requestsMapPrev = " + requestsMapPrev); 145 System.out.println("requestsMap = " + requestsMap); 146 147 assertEquals(expectedReadRequests, 148 requestsMap.get(Metric.REGION_READ) - requestsMapPrev.get(Metric.REGION_READ)); 149 boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(TEST_UTIL.getConfiguration()); 150 if (tablesOnMaster) { 151 // If NO tables on master, then the single regionserver in this test carries user-space 152 // tables and the meta table. The first time through, the read will be inflated by meta 153 // lookups. We don't know which test will be first through since junit randomizes. This 154 // method is used by a bunch of tests. Just do this check if master is hosting (system) 155 // regions only. 156 assertEquals(expectedReadRequests, 157 requestsMap.get(Metric.SERVER_READ) - requestsMapPrev.get(Metric.SERVER_READ)); 158 } 159 assertEquals(expectedFilteredReadRequests, 160 requestsMap.get(Metric.FILTERED_REGION_READ) 161 - requestsMapPrev.get(Metric.FILTERED_REGION_READ)); 162 assertEquals(expectedFilteredReadRequests, 163 requestsMap.get(Metric.FILTERED_SERVER_READ) 164 - requestsMapPrev.get(Metric.FILTERED_SERVER_READ)); 165 assertEquals(expectedReadRequests, resultCount); 166 } 167 168 private static void updateMetricsMap() throws IOException, InterruptedException { 169 for (Metric metric : Metric.values()) { 170 requestsMapPrev.put(metric, requestsMap.get(metric)); 171 } 172 173 ServerLoad serverLoad = null; 174 RegionLoad regionLoadOuter = null; 175 boolean metricsUpdated = false; 176 for (int i = 0; i < MAX_TRY; i++) { 177 for (ServerName serverName : serverNames) { 178 serverLoad = new ServerLoad(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 179 .getLiveServerMetrics().get(serverName)); 180 181 Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad(); 182 RegionLoad regionLoad = regionsLoad.get(regionInfo.getRegionName()); 183 if (regionLoad != null) { 184 regionLoadOuter = regionLoad; 185 for (Metric metric : Metric.values()) { 186 if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) { 187 for (Metric metricInner : Metric.values()) { 188 requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner)); 189 } 190 metricsUpdated = true; 191 break; 192 } 193 } 194 } 195 } 196 if (metricsUpdated) { 197 break; 198 } 199 Thread.sleep(SLEEP_MS); 200 } 201 if (!metricsUpdated) { 202 for (Metric metric : Metric.values()) { 203 requestsMap.put(metric, getReadRequest(serverLoad, regionLoadOuter, metric)); 204 } 205 } 206 } 207 208 private static long getReadRequest(ServerLoad serverLoad, RegionLoad regionLoad, Metric metric) { 209 switch (metric) { 210 case REGION_READ: 211 return regionLoad.getReadRequestsCount(); 212 case SERVER_READ: 213 return serverLoad.getReadRequestsCount(); 214 case FILTERED_REGION_READ: 215 return regionLoad.getFilteredReadRequestsCount(); 216 case FILTERED_SERVER_READ: 217 return serverLoad.getFilteredReadRequestsCount(); 218 default: 219 throw new IllegalStateException(); 220 } 221 } 222 223 private static void putData() throws IOException { 224 Put put; 225 226 put = new Put(ROW1); 227 put.addColumn(CF1, COL1, VAL1); 228 put.addColumn(CF1, COL2, VAL2); 229 put.addColumn(CF1, COL3, VAL3); 230 table.put(put); 231 put = new Put(ROW2); 232 put.addColumn(CF1, COL1, VAL2); // put val2 instead of val1 233 put.addColumn(CF1, COL2, VAL2); 234 table.put(put); 235 put = new Put(ROW3); 236 put.addColumn(CF1, COL1, VAL1); 237 put.addColumn(CF1, COL2, VAL2); 238 table.put(put); 239 } 240 241 private static void putTTLExpiredData() throws IOException, InterruptedException { 242 Put put; 243 244 put = new Put(ROW1); 245 put.addColumn(CF2, COL1, VAL1); 246 put.addColumn(CF2, COL2, VAL2); 247 table.put(put); 248 249 Thread.sleep(TTL * 1000); 250 251 put = new Put(ROW2); 252 put.addColumn(CF2, COL1, VAL1); 253 put.addColumn(CF2, COL2, VAL2); 254 table.put(put); 255 256 put = new Put(ROW3); 257 put.addColumn(CF2, COL1, VAL1); 258 put.addColumn(CF2, COL2, VAL2); 259 table.put(put); 260 } 261 262 @AfterClass 263 public static void tearDownOnce() throws Exception { 264 TEST_UTIL.shutdownMiniCluster(); 265 } 266 267 @Test 268 public void testReadRequestsCountNotFiltered() throws Exception { 269 int resultCount; 270 Scan scan; 271 Append append; 272 Put put; 273 Increment increment; 274 Get get; 275 276 // test for scan 277 scan = new Scan(); 278 try (ResultScanner scanner = table.getScanner(scan)) { 279 resultCount = 0; 280 for (Result ignore : scanner) { 281 resultCount++; 282 } 283 testReadRequests(resultCount, 3, 0); 284 } 285 286 // test for scan 287 scan = new Scan(ROW2, ROW3); 288 try (ResultScanner scanner = table.getScanner(scan)) { 289 resultCount = 0; 290 for (Result ignore : scanner) { 291 resultCount++; 292 } 293 testReadRequests(resultCount, 1, 0); 294 } 295 296 // test for get 297 get = new Get(ROW2); 298 Result result = table.get(get); 299 resultCount = result.isEmpty() ? 0 : 1; 300 testReadRequests(resultCount, 1, 0); 301 302 // test for increment 303 increment = new Increment(ROW1); 304 increment.addColumn(CF1, COL3, 1); 305 result = table.increment(increment); 306 resultCount = result.isEmpty() ? 0 : 1; 307 testReadRequests(resultCount, 1, 0); 308 309 // test for checkAndPut 310 put = new Put(ROW1); 311 put.addColumn(CF1, COL2, VAL2); 312 boolean checkAndPut = 313 table.checkAndMutate(ROW1, CF1).qualifier(COL2).ifEquals(VAL2).thenPut(put); 314 resultCount = checkAndPut ? 1 : 0; 315 testReadRequests(resultCount, 1, 0); 316 317 // test for append 318 append = new Append(ROW1); 319 append.addColumn(CF1, COL2, VAL2); 320 result = table.append(append); 321 resultCount = result.isEmpty() ? 0 : 1; 322 testReadRequests(resultCount, 1, 0); 323 324 // test for checkAndMutate 325 put = new Put(ROW1); 326 put.addColumn(CF1, COL1, VAL1); 327 RowMutations rm = new RowMutations(ROW1); 328 rm.add(put); 329 boolean checkAndMutate = 330 table.checkAndMutate(ROW1, CF1).qualifier(COL1).ifEquals(VAL1).thenMutate(rm); 331 resultCount = checkAndMutate ? 1 : 0; 332 testReadRequests(resultCount, 1, 0); 333 } 334 335 @Ignore // HBASE-19785 336 @Test 337 public void testReadRequestsCountWithFilter() throws Exception { 338 int resultCount; 339 Scan scan; 340 341 // test for scan 342 scan = new Scan(); 343 scan.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1)); 344 try (ResultScanner scanner = table.getScanner(scan)) { 345 resultCount = 0; 346 for (Result ignore : scanner) { 347 resultCount++; 348 } 349 testReadRequests(resultCount, 2, 1); 350 } 351 352 // test for scan 353 scan = new Scan(); 354 scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(ROW1))); 355 try (ResultScanner scanner = table.getScanner(scan)) { 356 resultCount = 0; 357 for (Result ignore : scanner) { 358 resultCount++; 359 } 360 testReadRequests(resultCount, 1, 2); 361 } 362 363 // test for scan 364 scan = new Scan(ROW2, ROW3); 365 scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(ROW1))); 366 try (ResultScanner scanner = table.getScanner(scan)) { 367 resultCount = 0; 368 for (Result ignore : scanner) { 369 resultCount++; 370 } 371 testReadRequests(resultCount, 0, 1); 372 } 373 374 // fixme filtered get should not increase readRequestsCount 375// Get get = new Get(ROW2); 376// get.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1)); 377// Result result = table.get(get); 378// resultCount = result.isEmpty() ? 0 : 1; 379// testReadRequests(resultCount, 0, 1); 380 } 381 382 @Ignore // HBASE-19785 383 @Test 384 public void testReadRequestsCountWithDeletedRow() throws Exception { 385 try { 386 Delete delete = new Delete(ROW3); 387 table.delete(delete); 388 389 Scan scan = new Scan(); 390 try (ResultScanner scanner = table.getScanner(scan)) { 391 int resultCount = 0; 392 for (Result ignore : scanner) { 393 resultCount++; 394 } 395 testReadRequests(resultCount, 2, 1); 396 } 397 } finally { 398 Put put = new Put(ROW3); 399 put.addColumn(CF1, COL1, VAL1); 400 put.addColumn(CF1, COL2, VAL2); 401 table.put(put); 402 } 403 } 404 405 @Test 406 public void testReadRequestsCountWithTTLExpiration() throws Exception { 407 putTTLExpiredData(); 408 409 Scan scan = new Scan(); 410 scan.addFamily(CF2); 411 try (ResultScanner scanner = table.getScanner(scan)) { 412 int resultCount = 0; 413 for (Result ignore : scanner) { 414 resultCount++; 415 } 416 testReadRequests(resultCount, 2, 1); 417 } 418 } 419 420 @Ignore // See HBASE-19785 421 @Test 422 public void testReadRequestsWithCoprocessor() throws Exception { 423 TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor"); 424 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 425 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)); 426 builder.setCoprocessor(ScanRegionCoprocessor.class.getName()); 427 admin.createTable(builder.build()); 428 429 try { 430 TEST_UTIL.waitTableAvailable(tableName); 431 List<RegionInfo> regionInfos = admin.getRegions(tableName); 432 assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regionInfos.size()); 433 boolean success = true; 434 int i = 0; 435 for (; i < MAX_TRY; i++) { 436 try { 437 testReadRequests(regionInfos.get(0).getRegionName(), 3); 438 } catch (Throwable t) { 439 LOG.warn("Got exception when try " + i + " times", t); 440 Thread.sleep(SLEEP_MS); 441 success = false; 442 } 443 if (success) { 444 break; 445 } 446 } 447 if (i == MAX_TRY) { 448 fail("Failed to get right read requests metric after try " + i + " times"); 449 } 450 } finally { 451 admin.disableTable(tableName); 452 admin.deleteTable(tableName); 453 } 454 } 455 456 private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception { 457 for (ServerName serverName : serverNames) { 458 ServerLoad serverLoad = new ServerLoad(admin.getClusterMetrics( 459 EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().get(serverName)); 460 Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad(); 461 RegionLoad regionLoad = regionsLoad.get(regionName); 462 if (regionLoad != null) { 463 LOG.debug("server read request is " + serverLoad.getReadRequestsCount() 464 + ", region read request is " + regionLoad.getReadRequestsCount()); 465 assertEquals(3, serverLoad.getReadRequestsCount()); 466 assertEquals(3, regionLoad.getReadRequestsCount()); 467 } 468 } 469 } 470 471 public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver { 472 @Override 473 public Optional<RegionObserver> getRegionObserver() { 474 return Optional.of(this); 475 } 476 477 @Override 478 public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) { 479 RegionCoprocessorEnvironment env = c.getEnvironment(); 480 Region region = env.getRegion(); 481 try { 482 putData(region); 483 RegionScanner scanner = region.getScanner(new Scan()); 484 List<Cell> result = new LinkedList<>(); 485 while (scanner.next(result)) { 486 result.clear(); 487 } 488 } catch (Exception e) { 489 LOG.warn("Got exception in coprocessor", e); 490 } 491 } 492 493 private void putData(Region region) throws Exception { 494 Put put = new Put(ROW1); 495 put.addColumn(CF1, COL1, VAL1); 496 region.put(put); 497 put = new Put(ROW2); 498 put.addColumn(CF1, COL1, VAL1); 499 region.put(put); 500 put = new Put(ROW3); 501 put.addColumn(CF1, COL1, VAL1); 502 region.put(put); 503 } 504 } 505 506 private enum Metric {REGION_READ, SERVER_READ, FILTERED_REGION_READ, FILTERED_SERVER_READ} 507}