001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.security.PrivilegedAction; 022import java.util.EnumSet; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.atomic.AtomicInteger; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.ClusterMetrics.Option; 031import org.apache.hadoop.hbase.Waiter.Predicate; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.AsyncAdmin; 034import org.apache.hadoop.hbase.client.AsyncConnection; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionInfoBuilder; 040import org.apache.hadoop.hbase.client.RegionStatesCount; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 045import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.MasterObserver; 048import org.apache.hadoop.hbase.coprocessor.ObserverContext; 049import org.apache.hadoop.hbase.filter.FilterAllFilter; 050import org.apache.hadoop.hbase.master.HMaster; 051import org.apache.hadoop.hbase.regionserver.HRegionServer; 052import org.apache.hadoop.hbase.regionserver.MetricsUserAggregateFactory; 053import org.apache.hadoop.hbase.security.User; 054import org.apache.hadoop.hbase.security.UserProvider; 055import org.apache.hadoop.hbase.testclassification.MediumTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 058import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 059import org.junit.AfterClass; 060import org.junit.Assert; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065 066@Category(MediumTests.class) 067public class TestClientClusterMetrics { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestClientClusterMetrics.class); 072 073 private static HBaseTestingUtility UTIL; 074 private static Admin ADMIN; 075 private final static int SLAVES = 5; 076 private final static int MASTERS = 3; 077 private static MiniHBaseCluster CLUSTER; 078 private static HRegionServer DEAD; 079 private static final TableName TABLE_NAME = TableName.valueOf("test"); 080 private static final byte[] CF = Bytes.toBytes("cf"); 081 082 083 @BeforeClass 084 public static void setUpBeforeClass() throws Exception { 085 Configuration conf = HBaseConfiguration.create(); 086 conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName()); 087 UTIL = new HBaseTestingUtility(conf); 088 StartMiniClusterOption option = StartMiniClusterOption.builder() 089 .numMasters(MASTERS).numRegionServers(SLAVES).numDataNodes(SLAVES).build(); 090 UTIL.startMiniCluster(option); 091 CLUSTER = UTIL.getHBaseCluster(); 092 CLUSTER.waitForActiveAndReadyMaster(); 093 ADMIN = UTIL.getAdmin(); 094 // Kill one region server 095 List<RegionServerThread> rsts = CLUSTER.getLiveRegionServerThreads(); 096 RegionServerThread rst = rsts.get(rsts.size() - 1); 097 DEAD = rst.getRegionServer(); 098 DEAD.stop("Test dead servers metrics"); 099 while (rst.isAlive()) { 100 Thread.sleep(500); 101 } 102 } 103 104 @Test 105 public void testDefaults() throws Exception { 106 ClusterMetrics origin = ADMIN.getClusterMetrics(); 107 ClusterMetrics defaults = ADMIN.getClusterMetrics(EnumSet.allOf(Option.class)); 108 Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion()); 109 Assert.assertEquals(origin.getClusterId(), defaults.getClusterId()); 110 Assert.assertEquals(origin.getAverageLoad(), defaults.getAverageLoad(), 0); 111 Assert.assertEquals(origin.getBackupMasterNames().size(), 112 defaults.getBackupMasterNames().size()); 113 Assert.assertEquals(origin.getDeadServerNames().size(), defaults.getDeadServerNames().size()); 114 Assert.assertEquals(origin.getRegionCount(), defaults.getRegionCount()); 115 Assert.assertEquals(origin.getLiveServerMetrics().size(), 116 defaults.getLiveServerMetrics().size()); 117 Assert.assertEquals(origin.getMasterInfoPort(), defaults.getMasterInfoPort()); 118 Assert.assertEquals(origin.getServersName().size(), defaults.getServersName().size()); 119 Assert.assertEquals(ADMIN.getRegionServers().size(), defaults.getServersName().size()); 120 } 121 122 @Test 123 public void testAsyncClient() throws Exception { 124 try (AsyncConnection asyncConnect = ConnectionFactory.createAsyncConnection( 125 UTIL.getConfiguration()).get()) { 126 AsyncAdmin asyncAdmin = asyncConnect.getAdmin(); 127 CompletableFuture<ClusterMetrics> originFuture = 128 asyncAdmin.getClusterMetrics(); 129 CompletableFuture<ClusterMetrics> defaultsFuture = 130 asyncAdmin.getClusterMetrics(EnumSet.allOf(Option.class)); 131 ClusterMetrics origin = originFuture.get(); 132 ClusterMetrics defaults = defaultsFuture.get(); 133 Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion()); 134 Assert.assertEquals(origin.getClusterId(), defaults.getClusterId()); 135 Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion()); 136 Assert.assertEquals(origin.getClusterId(), defaults.getClusterId()); 137 Assert.assertEquals(origin.getAverageLoad(), defaults.getAverageLoad(), 0); 138 Assert.assertEquals(origin.getBackupMasterNames().size(), 139 defaults.getBackupMasterNames().size()); 140 Assert.assertEquals(origin.getDeadServerNames().size(), defaults.getDeadServerNames().size()); 141 Assert.assertEquals(origin.getRegionCount(), defaults.getRegionCount()); 142 Assert.assertEquals(origin.getLiveServerMetrics().size(), 143 defaults.getLiveServerMetrics().size()); 144 Assert.assertEquals(origin.getMasterInfoPort(), defaults.getMasterInfoPort()); 145 Assert.assertEquals(origin.getServersName().size(), defaults.getServersName().size()); 146 origin.getTableRegionStatesCount().forEach(((tableName, regionStatesCount) -> { 147 RegionStatesCount defaultRegionStatesCount = defaults.getTableRegionStatesCount() 148 .get(tableName); 149 Assert.assertEquals(defaultRegionStatesCount, regionStatesCount); 150 })); 151 } 152 } 153 154 @Test 155 public void testLiveAndDeadServersStatus() throws Exception { 156 // Count the number of live regionservers 157 List<RegionServerThread> regionserverThreads = CLUSTER.getLiveRegionServerThreads(); 158 int numRs = 0; 159 int len = regionserverThreads.size(); 160 for (int i = 0; i < len; i++) { 161 if (regionserverThreads.get(i).isAlive()) { 162 numRs++; 163 } 164 } 165 // Depending on the (random) order of unit execution we may run this unit before the 166 // minicluster is fully up and recovered from the RS shutdown done during test init. 167 Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() { 168 @Override 169 public boolean evaluate() throws Exception { 170 ClusterMetrics metrics = ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)); 171 Assert.assertNotNull(metrics); 172 return metrics.getRegionCount() > 0; 173 } 174 }); 175 // Retrieve live servers and dead servers info. 176 EnumSet<Option> options = 177 EnumSet.of(Option.LIVE_SERVERS, Option.DEAD_SERVERS, Option.SERVERS_NAME); 178 ClusterMetrics metrics = ADMIN.getClusterMetrics(options); 179 Assert.assertNotNull(metrics); 180 // exclude a dead region server 181 Assert.assertEquals(SLAVES -1, numRs); 182 // live servers = nums of regionservers 183 // By default, HMaster don't carry any regions so it won't report its load. 184 // Hence, it won't be in the server list. 185 Assert.assertEquals(numRs, metrics.getLiveServerMetrics().size()); 186 Assert.assertTrue(metrics.getRegionCount() > 0); 187 Assert.assertNotNull(metrics.getDeadServerNames()); 188 Assert.assertEquals(1, metrics.getDeadServerNames().size()); 189 ServerName deadServerName = metrics.getDeadServerNames().iterator().next(); 190 Assert.assertEquals(DEAD.getServerName(), deadServerName); 191 Assert.assertNotNull(metrics.getServersName()); 192 Assert.assertEquals(numRs, metrics.getServersName().size()); 193 } 194 195 @Test 196 public void testRegionStatesCount() throws Exception { 197 Table table = UTIL.createTable(TABLE_NAME, CF); 198 table.put(new Put(Bytes.toBytes("k1")) 199 .addColumn(CF, Bytes.toBytes("q1"), Bytes.toBytes("v1"))); 200 table.put(new Put(Bytes.toBytes("k2")) 201 .addColumn(CF, Bytes.toBytes("q2"), Bytes.toBytes("v2"))); 202 table.put(new Put(Bytes.toBytes("k3")) 203 .addColumn(CF, Bytes.toBytes("q3"), Bytes.toBytes("v3"))); 204 205 ClusterMetrics metrics = ADMIN.getClusterMetrics(); 206 Assert.assertEquals(metrics.getTableRegionStatesCount().size(), 3); 207 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME) 208 .getRegionsInTransition(), 0); 209 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME) 210 .getOpenRegions(), 1); 211 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME) 212 .getTotalRegions(), 1); 213 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME) 214 .getClosedRegions(), 0); 215 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME) 216 .getSplitRegions(), 0); 217 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME) 218 .getRegionsInTransition(), 0); 219 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME) 220 .getOpenRegions(), 1); 221 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME) 222 .getTotalRegions(), 1); 223 224 UTIL.deleteTable(TABLE_NAME); 225 } 226 227 @Test 228 public void testMasterAndBackupMastersStatus() throws Exception { 229 // get all the master threads 230 List<MasterThread> masterThreads = CLUSTER.getMasterThreads(); 231 int numActive = 0; 232 int activeIndex = 0; 233 ServerName activeName = null; 234 HMaster active = null; 235 for (int i = 0; i < masterThreads.size(); i++) { 236 if (masterThreads.get(i).getMaster().isActiveMaster()) { 237 numActive++; 238 activeIndex = i; 239 active = masterThreads.get(activeIndex).getMaster(); 240 activeName = active.getServerName(); 241 } 242 } 243 Assert.assertNotNull(active); 244 Assert.assertEquals(1, numActive); 245 Assert.assertEquals(MASTERS, masterThreads.size()); 246 // Retrieve master and backup masters infos only. 247 EnumSet<Option> options = EnumSet.of(Option.MASTER, Option.BACKUP_MASTERS); 248 ClusterMetrics metrics = ADMIN.getClusterMetrics(options); 249 Assert.assertTrue(metrics.getMasterName().equals(activeName)); 250 Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size()); 251 } 252 253 @Test public void testUserMetrics() throws Exception { 254 Configuration conf = UTIL.getConfiguration(); 255 // If metrics for users is not enabled, this test doesn't make sense. 256 if (!conf.getBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF, 257 MetricsUserAggregateFactory.DEFAULT_METRIC_USER_ENABLED_CONF)) { 258 return; 259 } 260 User userFoo = User.createUserForTesting(conf, "FOO_USER_METRIC_TEST", new String[0]); 261 User userBar = User.createUserForTesting(conf, "BAR_USER_METRIC_TEST", new String[0]); 262 User userTest = User.createUserForTesting(conf, "TEST_USER_METRIC_TEST", new String[0]); 263 UTIL.createTable(TABLE_NAME, CF); 264 waitForUsersMetrics(0); 265 long writeMetaMetricBeforeNextuser = getMetaMetrics().getWriteRequestCount(); 266 userFoo.runAs(new PrivilegedAction<Void>() { 267 @Override public Void run() { 268 try { 269 doPut(); 270 } catch (IOException e) { 271 Assert.fail("Exception:" + e.getMessage()); 272 } 273 return null; 274 } 275 }); 276 waitForUsersMetrics(1); 277 long writeMetaMetricForUserFoo = 278 getMetaMetrics().getWriteRequestCount() - writeMetaMetricBeforeNextuser; 279 long readMetaMetricBeforeNextuser = getMetaMetrics().getReadRequestCount(); 280 userBar.runAs(new PrivilegedAction<Void>() { 281 @Override public Void run() { 282 try { 283 doGet(); 284 } catch (IOException e) { 285 Assert.fail("Exception:" + e.getMessage()); 286 } 287 return null; 288 } 289 }); 290 waitForUsersMetrics(2); 291 long readMetaMetricForUserBar = 292 getMetaMetrics().getReadRequestCount() - readMetaMetricBeforeNextuser; 293 long filteredMetaReqeust = getMetaMetrics().getFilteredReadRequestCount(); 294 userTest.runAs(new PrivilegedAction<Void>() { 295 @Override public Void run() { 296 try { 297 Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME); 298 for (Result result : table.getScanner(new Scan().setFilter(new FilterAllFilter()))) { 299 Assert.fail("Should have filtered all rows"); 300 } 301 } catch (IOException e) { 302 Assert.fail("Exception:" + e.getMessage()); 303 } 304 return null; 305 } 306 }); 307 waitForUsersMetrics(3); 308 long filteredMetaReqeustForTestUser = 309 getMetaMetrics().getFilteredReadRequestCount() - filteredMetaReqeust; 310 Map<byte[], UserMetrics> userMap = 311 ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values() 312 .iterator().next().getUserMetrics(); 313 for (byte[] user : userMap.keySet()) { 314 switch (Bytes.toString(user)) { 315 case "FOO_USER_METRIC_TEST": 316 Assert.assertEquals(1, 317 userMap.get(user).getWriteRequestCount() - writeMetaMetricForUserFoo); 318 break; 319 case "BAR_USER_METRIC_TEST": 320 Assert 321 .assertEquals(1, userMap.get(user).getReadRequestCount() - readMetaMetricForUserBar); 322 Assert.assertEquals(0, userMap.get(user).getWriteRequestCount()); 323 break; 324 case "TEST_USER_METRIC_TEST": 325 Assert.assertEquals(1, 326 userMap.get(user).getFilteredReadRequests() - filteredMetaReqeustForTestUser); 327 Assert.assertEquals(0, userMap.get(user).getWriteRequestCount()); 328 break; 329 default: 330 //current user 331 Assert.assertEquals(UserProvider.instantiate(conf).getCurrent().getName(), 332 Bytes.toString(user)); 333 //Read/write count because of Meta operations 334 Assert.assertTrue(userMap.get(user).getReadRequestCount() > 1); 335 break; 336 } 337 } 338 UTIL.deleteTable(TABLE_NAME); 339 } 340 341 private RegionMetrics getMetaMetrics() throws IOException { 342 for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 343 .getLiveServerMetrics().values()) { 344 RegionMetrics metaMetrics = serverMetrics.getRegionMetrics() 345 .get(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()); 346 if (metaMetrics != null) { 347 return metaMetrics; 348 } 349 } 350 Assert.fail("Should have find meta metrics"); 351 return null; 352 } 353 354 private void waitForUsersMetrics(int noOfUsers) throws Exception { 355 //Sleep for metrics to get updated on master 356 Thread.sleep(5000); 357 Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() { 358 @Override public boolean evaluate() throws Exception { 359 Map<byte[], UserMetrics> metrics = 360 ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values() 361 .iterator().next().getUserMetrics(); 362 Assert.assertNotNull(metrics); 363 //including current user + noOfUsers 364 return metrics.keySet().size() > noOfUsers; 365 } 366 }); 367 } 368 369 private void doPut() throws IOException { 370 try (Connection conn = createConnection(UTIL.getConfiguration())) { 371 Table table = conn.getTable(TABLE_NAME); 372 table 373 .put(new Put(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"), Bytes.toBytes("1"))); 374 } 375 } 376 377 private void doGet() throws IOException { 378 try (Connection conn = createConnection(UTIL.getConfiguration())) { 379 Table table = conn.getTable(TABLE_NAME); 380 table.get(new Get(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"))); 381 } 382 } 383 384 private Connection createConnection(Configuration conf) throws IOException { 385 User user = UserProvider.instantiate(conf).getCurrent(); 386 return ConnectionFactory.createConnection(conf, user); 387 } 388 389 @Test 390 public void testOtherStatusInfos() throws Exception { 391 EnumSet<Option> options = 392 EnumSet.of(Option.MASTER_COPROCESSORS, Option.HBASE_VERSION, 393 Option.CLUSTER_ID, Option.BALANCER_ON); 394 ClusterMetrics metrics = ADMIN.getClusterMetrics(options); 395 Assert.assertEquals(1, metrics.getMasterCoprocessorNames().size()); 396 Assert.assertNotNull(metrics.getHBaseVersion()); 397 Assert.assertNotNull(metrics.getClusterId()); 398 Assert.assertTrue(metrics.getAverageLoad() == 0.0); 399 Assert.assertNotNull(metrics.getBalancerOn()); 400 } 401 402 @AfterClass 403 public static void tearDownAfterClass() throws Exception { 404 if (ADMIN != null) { 405 ADMIN.close(); 406 } 407 UTIL.shutdownMiniCluster(); 408 } 409 410 @Test 411 public void testObserver() throws IOException { 412 int preCount = MyObserver.PRE_COUNT.get(); 413 int postCount = MyObserver.POST_COUNT.get(); 414 Assert.assertTrue(ADMIN.getClusterMetrics().getMasterCoprocessorNames().stream() 415 .anyMatch(s -> s.equals(MyObserver.class.getSimpleName()))); 416 Assert.assertEquals(preCount + 1, MyObserver.PRE_COUNT.get()); 417 Assert.assertEquals(postCount + 1, MyObserver.POST_COUNT.get()); 418 } 419 420 private static void insertData(final TableName tableName, int startRow, int rowCount) 421 throws IOException { 422 Table t = UTIL.getConnection().getTable(tableName); 423 Put p; 424 for (int i = 0; i < rowCount; i++) { 425 p = new Put(Bytes.toBytes("" + (startRow + i))); 426 p.addColumn(CF, Bytes.toBytes("val1"), Bytes.toBytes(i)); 427 t.put(p); 428 } 429 } 430 431 public static class MyObserver implements MasterCoprocessor, MasterObserver { 432 private static final AtomicInteger PRE_COUNT = new AtomicInteger(0); 433 private static final AtomicInteger POST_COUNT = new AtomicInteger(0); 434 435 @Override public Optional<MasterObserver> getMasterObserver() { 436 return Optional.of(this); 437 } 438 439 @Override public void preGetClusterMetrics(ObserverContext<MasterCoprocessorEnvironment> ctx) 440 throws IOException { 441 PRE_COUNT.incrementAndGet(); 442 } 443 444 @Override public void postGetClusterMetrics(ObserverContext<MasterCoprocessorEnvironment> ctx, 445 ClusterMetrics metrics) throws IOException { 446 POST_COUNT.incrementAndGet(); 447 } 448 } 449}