001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.security.PrivilegedAction; 022import java.util.EnumSet; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.atomic.AtomicInteger; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.ClusterMetrics.Option; 031import org.apache.hadoop.hbase.Waiter.Predicate; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.AsyncAdmin; 034import org.apache.hadoop.hbase.client.AsyncConnection; 035import org.apache.hadoop.hbase.client.Connection; 036import org.apache.hadoop.hbase.client.ConnectionFactory; 037import org.apache.hadoop.hbase.client.Get; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.client.RegionInfoBuilder; 040import org.apache.hadoop.hbase.client.RegionStatesCount; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 045import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 047import org.apache.hadoop.hbase.coprocessor.MasterObserver; 048import org.apache.hadoop.hbase.coprocessor.ObserverContext; 049import org.apache.hadoop.hbase.filter.FilterAllFilter; 050import org.apache.hadoop.hbase.master.HMaster; 051import org.apache.hadoop.hbase.regionserver.HRegionServer; 052import org.apache.hadoop.hbase.security.User; 053import org.apache.hadoop.hbase.security.UserProvider; 054import org.apache.hadoop.hbase.testclassification.MediumTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; 057import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 058import org.junit.AfterClass; 059import org.junit.Assert; 060import org.junit.BeforeClass; 061import org.junit.ClassRule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064 065@Category(MediumTests.class) 066public class TestClientClusterMetrics { 067 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestClientClusterMetrics.class); 071 072 private static HBaseTestingUtility UTIL; 073 private static Admin ADMIN; 074 private final static int SLAVES = 5; 075 private final static int MASTERS = 3; 076 private static MiniHBaseCluster CLUSTER; 077 private static HRegionServer DEAD; 078 private static final TableName TABLE_NAME = TableName.valueOf("test"); 079 private static final byte[] CF = Bytes.toBytes("cf"); 080 081 082 @BeforeClass 083 public static void setUpBeforeClass() throws Exception { 084 Configuration conf = HBaseConfiguration.create(); 085 conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName()); 086 UTIL = new HBaseTestingUtility(conf); 087 StartMiniClusterOption option = StartMiniClusterOption.builder() 088 .numMasters(MASTERS).numRegionServers(SLAVES).numDataNodes(SLAVES).build(); 089 UTIL.startMiniCluster(option); 090 CLUSTER = UTIL.getHBaseCluster(); 091 CLUSTER.waitForActiveAndReadyMaster(); 092 ADMIN = UTIL.getAdmin(); 093 // Kill one region server 094 List<RegionServerThread> rsts = CLUSTER.getLiveRegionServerThreads(); 095 RegionServerThread rst = rsts.get(rsts.size() - 1); 096 DEAD = rst.getRegionServer(); 097 DEAD.stop("Test dead servers metrics"); 098 while (rst.isAlive()) { 099 Thread.sleep(500); 100 } 101 } 102 103 @Test 104 public void testDefaults() throws Exception { 105 ClusterMetrics origin = ADMIN.getClusterMetrics(); 106 ClusterMetrics defaults = ADMIN.getClusterMetrics(EnumSet.allOf(Option.class)); 107 Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion()); 108 Assert.assertEquals(origin.getClusterId(), defaults.getClusterId()); 109 Assert.assertEquals(origin.getAverageLoad(), defaults.getAverageLoad(), 0); 110 Assert.assertEquals(origin.getBackupMasterNames().size(), 111 defaults.getBackupMasterNames().size()); 112 Assert.assertEquals(origin.getDeadServerNames().size(), defaults.getDeadServerNames().size()); 113 Assert.assertEquals(origin.getRegionCount(), defaults.getRegionCount()); 114 Assert.assertEquals(origin.getLiveServerMetrics().size(), 115 defaults.getLiveServerMetrics().size()); 116 Assert.assertEquals(origin.getMasterInfoPort(), defaults.getMasterInfoPort()); 117 Assert.assertEquals(origin.getServersName().size(), defaults.getServersName().size()); 118 Assert.assertEquals(ADMIN.getRegionServers().size(), defaults.getServersName().size()); 119 } 120 121 @Test 122 public void testAsyncClient() throws Exception { 123 try (AsyncConnection asyncConnect = ConnectionFactory.createAsyncConnection( 124 UTIL.getConfiguration()).get()) { 125 AsyncAdmin asyncAdmin = asyncConnect.getAdmin(); 126 CompletableFuture<ClusterMetrics> originFuture = 127 asyncAdmin.getClusterMetrics(); 128 CompletableFuture<ClusterMetrics> defaultsFuture = 129 asyncAdmin.getClusterMetrics(EnumSet.allOf(Option.class)); 130 ClusterMetrics origin = originFuture.get(); 131 ClusterMetrics defaults = defaultsFuture.get(); 132 Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion()); 133 Assert.assertEquals(origin.getClusterId(), defaults.getClusterId()); 134 Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion()); 135 Assert.assertEquals(origin.getClusterId(), defaults.getClusterId()); 136 Assert.assertEquals(origin.getAverageLoad(), defaults.getAverageLoad(), 0); 137 Assert.assertEquals(origin.getBackupMasterNames().size(), 138 defaults.getBackupMasterNames().size()); 139 Assert.assertEquals(origin.getDeadServerNames().size(), defaults.getDeadServerNames().size()); 140 Assert.assertEquals(origin.getRegionCount(), defaults.getRegionCount()); 141 Assert.assertEquals(origin.getLiveServerMetrics().size(), 142 defaults.getLiveServerMetrics().size()); 143 Assert.assertEquals(origin.getMasterInfoPort(), defaults.getMasterInfoPort()); 144 Assert.assertEquals(origin.getServersName().size(), defaults.getServersName().size()); 145 origin.getTableRegionStatesCount().forEach(((tableName, regionStatesCount) -> { 146 RegionStatesCount defaultRegionStatesCount = defaults.getTableRegionStatesCount() 147 .get(tableName); 148 Assert.assertEquals(defaultRegionStatesCount, regionStatesCount); 149 })); 150 } 151 } 152 153 @Test 154 public void testLiveAndDeadServersStatus() throws Exception { 155 // Count the number of live regionservers 156 List<RegionServerThread> regionserverThreads = CLUSTER.getLiveRegionServerThreads(); 157 int numRs = 0; 158 int len = regionserverThreads.size(); 159 for (int i = 0; i < len; i++) { 160 if (regionserverThreads.get(i).isAlive()) { 161 numRs++; 162 } 163 } 164 // Depending on the (random) order of unit execution we may run this unit before the 165 // minicluster is fully up and recovered from the RS shutdown done during test init. 166 Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() { 167 @Override 168 public boolean evaluate() throws Exception { 169 ClusterMetrics metrics = ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)); 170 Assert.assertNotNull(metrics); 171 return metrics.getRegionCount() > 0; 172 } 173 }); 174 // Retrieve live servers and dead servers info. 175 EnumSet<Option> options = 176 EnumSet.of(Option.LIVE_SERVERS, Option.DEAD_SERVERS, Option.SERVERS_NAME); 177 ClusterMetrics metrics = ADMIN.getClusterMetrics(options); 178 Assert.assertNotNull(metrics); 179 // exclude a dead region server 180 Assert.assertEquals(SLAVES -1, numRs); 181 // live servers = nums of regionservers 182 // By default, HMaster don't carry any regions so it won't report its load. 183 // Hence, it won't be in the server list. 184 Assert.assertEquals(numRs, metrics.getLiveServerMetrics().size()); 185 Assert.assertTrue(metrics.getRegionCount() > 0); 186 Assert.assertNotNull(metrics.getDeadServerNames()); 187 Assert.assertEquals(1, metrics.getDeadServerNames().size()); 188 ServerName deadServerName = metrics.getDeadServerNames().iterator().next(); 189 Assert.assertEquals(DEAD.getServerName(), deadServerName); 190 Assert.assertNotNull(metrics.getServersName()); 191 Assert.assertEquals(numRs, metrics.getServersName().size()); 192 } 193 194 @Test 195 public void testRegionStatesCount() throws Exception { 196 Table table = UTIL.createTable(TABLE_NAME, CF); 197 table.put(new Put(Bytes.toBytes("k1")) 198 .addColumn(CF, Bytes.toBytes("q1"), Bytes.toBytes("v1"))); 199 table.put(new Put(Bytes.toBytes("k2")) 200 .addColumn(CF, Bytes.toBytes("q2"), Bytes.toBytes("v2"))); 201 table.put(new Put(Bytes.toBytes("k3")) 202 .addColumn(CF, Bytes.toBytes("q3"), Bytes.toBytes("v3"))); 203 204 ClusterMetrics metrics = ADMIN.getClusterMetrics(); 205 Assert.assertEquals(metrics.getTableRegionStatesCount().size(), 3); 206 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME) 207 .getRegionsInTransition(), 0); 208 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME) 209 .getOpenRegions(), 1); 210 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME) 211 .getTotalRegions(), 1); 212 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME) 213 .getClosedRegions(), 0); 214 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME) 215 .getSplitRegions(), 0); 216 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME) 217 .getRegionsInTransition(), 0); 218 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME) 219 .getOpenRegions(), 1); 220 Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME) 221 .getTotalRegions(), 1); 222 223 UTIL.deleteTable(TABLE_NAME); 224 } 225 226 @Test 227 public void testMasterAndBackupMastersStatus() throws Exception { 228 // get all the master threads 229 List<MasterThread> masterThreads = CLUSTER.getMasterThreads(); 230 int numActive = 0; 231 int activeIndex = 0; 232 ServerName activeName = null; 233 HMaster active = null; 234 for (int i = 0; i < masterThreads.size(); i++) { 235 if (masterThreads.get(i).getMaster().isActiveMaster()) { 236 numActive++; 237 activeIndex = i; 238 active = masterThreads.get(activeIndex).getMaster(); 239 activeName = active.getServerName(); 240 } 241 } 242 Assert.assertNotNull(active); 243 Assert.assertEquals(1, numActive); 244 Assert.assertEquals(MASTERS, masterThreads.size()); 245 // Retrieve master and backup masters infos only. 246 EnumSet<Option> options = EnumSet.of(Option.MASTER, Option.BACKUP_MASTERS); 247 ClusterMetrics metrics = ADMIN.getClusterMetrics(options); 248 Assert.assertTrue(metrics.getMasterName().equals(activeName)); 249 Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size()); 250 } 251 252 @Test public void testUserMetrics() throws Exception { 253 Configuration conf = UTIL.getConfiguration(); 254 User userFoo = User.createUserForTesting(conf, "FOO_USER_METRIC_TEST", new String[0]); 255 User userBar = User.createUserForTesting(conf, "BAR_USER_METRIC_TEST", new String[0]); 256 User userTest = User.createUserForTesting(conf, "TEST_USER_METRIC_TEST", new String[0]); 257 UTIL.createTable(TABLE_NAME, CF); 258 waitForUsersMetrics(0); 259 long writeMetaMetricBeforeNextuser = getMetaMetrics().getWriteRequestCount(); 260 userFoo.runAs(new PrivilegedAction<Void>() { 261 @Override public Void run() { 262 try { 263 doPut(); 264 } catch (IOException e) { 265 Assert.fail("Exception:" + e.getMessage()); 266 } 267 return null; 268 } 269 }); 270 waitForUsersMetrics(1); 271 long writeMetaMetricForUserFoo = 272 getMetaMetrics().getWriteRequestCount() - writeMetaMetricBeforeNextuser; 273 long readMetaMetricBeforeNextuser = getMetaMetrics().getReadRequestCount(); 274 userBar.runAs(new PrivilegedAction<Void>() { 275 @Override public Void run() { 276 try { 277 doGet(); 278 } catch (IOException e) { 279 Assert.fail("Exception:" + e.getMessage()); 280 } 281 return null; 282 } 283 }); 284 waitForUsersMetrics(2); 285 long readMetaMetricForUserBar = 286 getMetaMetrics().getReadRequestCount() - readMetaMetricBeforeNextuser; 287 long filteredMetaReqeust = getMetaMetrics().getFilteredReadRequestCount(); 288 userTest.runAs(new PrivilegedAction<Void>() { 289 @Override public Void run() { 290 try { 291 Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME); 292 for (Result result : table.getScanner(new Scan().setFilter(new FilterAllFilter()))) { 293 Assert.fail("Should have filtered all rows"); 294 } 295 } catch (IOException e) { 296 Assert.fail("Exception:" + e.getMessage()); 297 } 298 return null; 299 } 300 }); 301 waitForUsersMetrics(3); 302 long filteredMetaReqeustForTestUser = 303 getMetaMetrics().getFilteredReadRequestCount() - filteredMetaReqeust; 304 Map<byte[], UserMetrics> userMap = 305 ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values() 306 .iterator().next().getUserMetrics(); 307 for (byte[] user : userMap.keySet()) { 308 switch (Bytes.toString(user)) { 309 case "FOO_USER_METRIC_TEST": 310 Assert.assertEquals(1, 311 userMap.get(user).getWriteRequestCount() - writeMetaMetricForUserFoo); 312 break; 313 case "BAR_USER_METRIC_TEST": 314 Assert 315 .assertEquals(1, userMap.get(user).getReadRequestCount() - readMetaMetricForUserBar); 316 Assert.assertEquals(0, userMap.get(user).getWriteRequestCount()); 317 break; 318 case "TEST_USER_METRIC_TEST": 319 Assert.assertEquals(1, 320 userMap.get(user).getFilteredReadRequests() - filteredMetaReqeustForTestUser); 321 Assert.assertEquals(0, userMap.get(user).getWriteRequestCount()); 322 break; 323 default: 324 //current user 325 Assert.assertEquals(UserProvider.instantiate(conf).getCurrent().getName(), 326 Bytes.toString(user)); 327 //Read/write count because of Meta operations 328 Assert.assertTrue(userMap.get(user).getReadRequestCount() > 1); 329 break; 330 } 331 } 332 UTIL.deleteTable(TABLE_NAME); 333 } 334 335 private RegionMetrics getMetaMetrics() throws IOException { 336 for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) 337 .getLiveServerMetrics().values()) { 338 RegionMetrics metaMetrics = serverMetrics.getRegionMetrics() 339 .get(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName()); 340 if (metaMetrics != null) { 341 return metaMetrics; 342 } 343 } 344 Assert.fail("Should have find meta metrics"); 345 return null; 346 } 347 348 private void waitForUsersMetrics(int noOfUsers) throws Exception { 349 //Sleep for metrics to get updated on master 350 Thread.sleep(5000); 351 Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() { 352 @Override public boolean evaluate() throws Exception { 353 Map<byte[], UserMetrics> metrics = 354 ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values() 355 .iterator().next().getUserMetrics(); 356 Assert.assertNotNull(metrics); 357 //including current user + noOfUsers 358 return metrics.keySet().size() > noOfUsers; 359 } 360 }); 361 } 362 363 private void doPut() throws IOException { 364 try (Connection conn = createConnection(UTIL.getConfiguration())) { 365 Table table = conn.getTable(TABLE_NAME); 366 table 367 .put(new Put(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"), Bytes.toBytes("1"))); 368 } 369 } 370 371 private void doGet() throws IOException { 372 try (Connection conn = createConnection(UTIL.getConfiguration())) { 373 Table table = conn.getTable(TABLE_NAME); 374 table.get(new Get(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"))); 375 } 376 } 377 378 private Connection createConnection(Configuration conf) throws IOException { 379 User user = UserProvider.instantiate(conf).getCurrent(); 380 return ConnectionFactory.createConnection(conf, user); 381 } 382 383 @Test 384 public void testOtherStatusInfos() throws Exception { 385 EnumSet<Option> options = 386 EnumSet.of(Option.MASTER_COPROCESSORS, Option.HBASE_VERSION, 387 Option.CLUSTER_ID, Option.BALANCER_ON); 388 ClusterMetrics metrics = ADMIN.getClusterMetrics(options); 389 Assert.assertEquals(1, metrics.getMasterCoprocessorNames().size()); 390 Assert.assertNotNull(metrics.getHBaseVersion()); 391 Assert.assertNotNull(metrics.getClusterId()); 392 Assert.assertTrue(metrics.getAverageLoad() == 0.0); 393 Assert.assertNotNull(metrics.getBalancerOn()); 394 } 395 396 @AfterClass 397 public static void tearDownAfterClass() throws Exception { 398 if (ADMIN != null) { 399 ADMIN.close(); 400 } 401 UTIL.shutdownMiniCluster(); 402 } 403 404 @Test 405 public void testObserver() throws IOException { 406 int preCount = MyObserver.PRE_COUNT.get(); 407 int postCount = MyObserver.POST_COUNT.get(); 408 Assert.assertTrue(ADMIN.getClusterMetrics().getMasterCoprocessorNames().stream() 409 .anyMatch(s -> s.equals(MyObserver.class.getSimpleName()))); 410 Assert.assertEquals(preCount + 1, MyObserver.PRE_COUNT.get()); 411 Assert.assertEquals(postCount + 1, MyObserver.POST_COUNT.get()); 412 } 413 414 private static void insertData(final TableName tableName, int startRow, int rowCount) 415 throws IOException { 416 Table t = UTIL.getConnection().getTable(tableName); 417 Put p; 418 for (int i = 0; i < rowCount; i++) { 419 p = new Put(Bytes.toBytes("" + (startRow + i))); 420 p.addColumn(CF, Bytes.toBytes("val1"), Bytes.toBytes(i)); 421 t.put(p); 422 } 423 } 424 425 public static class MyObserver implements MasterCoprocessor, MasterObserver { 426 private static final AtomicInteger PRE_COUNT = new AtomicInteger(0); 427 private static final AtomicInteger POST_COUNT = new AtomicInteger(0); 428 429 @Override public Optional<MasterObserver> getMasterObserver() { 430 return Optional.of(this); 431 } 432 433 @Override public void preGetClusterMetrics(ObserverContext<MasterCoprocessorEnvironment> ctx) 434 throws IOException { 435 PRE_COUNT.incrementAndGet(); 436 } 437 438 @Override public void postGetClusterMetrics(ObserverContext<MasterCoprocessorEnvironment> ctx, 439 ClusterMetrics metrics) throws IOException { 440 POST_COUNT.incrementAndGet(); 441 } 442 } 443}