001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.security.PrivilegedAction;
022import java.util.EnumSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.atomic.AtomicInteger;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.ClusterMetrics.Option;
030import org.apache.hadoop.hbase.Waiter.Predicate;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.AsyncAdmin;
033import org.apache.hadoop.hbase.client.AsyncConnection;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Get;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.RegionInfoBuilder;
039import org.apache.hadoop.hbase.client.RegionStatesCount;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
044import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
045import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
046import org.apache.hadoop.hbase.coprocessor.MasterObserver;
047import org.apache.hadoop.hbase.coprocessor.ObserverContext;
048import org.apache.hadoop.hbase.filter.FilterAllFilter;
049import org.apache.hadoop.hbase.master.HMaster;
050import org.apache.hadoop.hbase.monitoring.TaskMonitor;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.regionserver.MetricsUserAggregateFactory;
053import org.apache.hadoop.hbase.security.User;
054import org.apache.hadoop.hbase.security.UserProvider;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
058import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
059import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
060import org.junit.AfterClass;
061import org.junit.Assert;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066
067@Category(MediumTests.class)
068public class TestClientClusterMetrics {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestClientClusterMetrics.class);
073
074  private static HBaseTestingUtility UTIL;
075  private static Admin ADMIN;
076  private final static int SLAVES = 5;
077  private final static int MASTERS = 3;
078  private static MiniHBaseCluster CLUSTER;
079  private static HRegionServer DEAD;
080  private static final TableName TABLE_NAME = TableName.valueOf("test");
081  private static final byte[] CF = Bytes.toBytes("cf");
082
083  // We need to promote the visibility of tryRegionServerReport for this test
084  public static class MyRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
085    public MyRegionServer(Configuration conf) throws IOException, InterruptedException {
086      super(conf);
087    }
088
089    @Override
090    public void tryRegionServerReport(long reportStartTime, long reportEndTime) throws IOException {
091      super.tryRegionServerReport(reportStartTime, reportEndTime);
092    }
093  }
094
095  @BeforeClass
096  public static void setUpBeforeClass() throws Exception {
097    Configuration conf = HBaseConfiguration.create();
098    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
099    UTIL = new HBaseTestingUtility(conf);
100    StartMiniClusterOption option =
101      StartMiniClusterOption.builder().rsClass(TestClientClusterMetrics.MyRegionServer.class)
102        .numMasters(MASTERS).numRegionServers(SLAVES).numDataNodes(SLAVES).build();
103    UTIL.startMiniCluster(option);
104    CLUSTER = UTIL.getHBaseCluster();
105    CLUSTER.waitForActiveAndReadyMaster();
106    ADMIN = UTIL.getAdmin();
107    // Kill one region server
108    List<RegionServerThread> rsts = CLUSTER.getLiveRegionServerThreads();
109    RegionServerThread rst = rsts.get(rsts.size() - 1);
110    DEAD = rst.getRegionServer();
111    DEAD.stop("Test dead servers metrics");
112    while (rst.isAlive()) {
113      Thread.sleep(500);
114    }
115  }
116
117  @Test
118  public void testDefaults() throws Exception {
119    ClusterMetrics origin = ADMIN.getClusterMetrics();
120    ClusterMetrics defaults = ADMIN.getClusterMetrics(EnumSet.allOf(Option.class));
121    Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
122    Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
123    Assert.assertEquals(origin.getAverageLoad(), defaults.getAverageLoad(), 0);
124    Assert.assertEquals(origin.getBackupMasterNames().size(),
125      defaults.getBackupMasterNames().size());
126    Assert.assertEquals(origin.getDeadServerNames().size(), defaults.getDeadServerNames().size());
127    Assert.assertEquals(origin.getRegionCount(), defaults.getRegionCount());
128    Assert.assertEquals(origin.getLiveServerMetrics().size(),
129      defaults.getLiveServerMetrics().size());
130    Assert.assertEquals(origin.getMasterInfoPort(), defaults.getMasterInfoPort());
131    Assert.assertEquals(origin.getServersName().size(), defaults.getServersName().size());
132    Assert.assertEquals(ADMIN.getRegionServers().size(), defaults.getServersName().size());
133  }
134
135  @Test
136  public void testAsyncClient() throws Exception {
137    try (AsyncConnection asyncConnect =
138      ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get()) {
139      AsyncAdmin asyncAdmin = asyncConnect.getAdmin();
140      CompletableFuture<ClusterMetrics> originFuture = asyncAdmin.getClusterMetrics();
141      CompletableFuture<ClusterMetrics> defaultsFuture =
142        asyncAdmin.getClusterMetrics(EnumSet.allOf(Option.class));
143      ClusterMetrics origin = originFuture.get();
144      ClusterMetrics defaults = defaultsFuture.get();
145      Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
146      Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
147      Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
148      Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
149      Assert.assertEquals(origin.getAverageLoad(), defaults.getAverageLoad(), 0);
150      Assert.assertEquals(origin.getBackupMasterNames().size(),
151        defaults.getBackupMasterNames().size());
152      Assert.assertEquals(origin.getDeadServerNames().size(), defaults.getDeadServerNames().size());
153      Assert.assertEquals(origin.getRegionCount(), defaults.getRegionCount());
154      Assert.assertEquals(origin.getLiveServerMetrics().size(),
155        defaults.getLiveServerMetrics().size());
156      Assert.assertEquals(origin.getMasterInfoPort(), defaults.getMasterInfoPort());
157      Assert.assertEquals(origin.getServersName().size(), defaults.getServersName().size());
158      origin.getTableRegionStatesCount().forEach(((tableName, regionStatesCount) -> {
159        RegionStatesCount defaultRegionStatesCount =
160          defaults.getTableRegionStatesCount().get(tableName);
161        Assert.assertEquals(defaultRegionStatesCount, regionStatesCount);
162      }));
163    }
164  }
165
166  @Test
167  public void testLiveAndDeadServersStatus() throws Exception {
168    // Count the number of live regionservers
169    List<RegionServerThread> regionserverThreads = CLUSTER.getLiveRegionServerThreads();
170    int numRs = 0;
171    int len = regionserverThreads.size();
172    for (int i = 0; i < len; i++) {
173      if (regionserverThreads.get(i).isAlive()) {
174        numRs++;
175      }
176    }
177    // Depending on the (random) order of unit execution we may run this unit before the
178    // minicluster is fully up and recovered from the RS shutdown done during test init.
179    Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() {
180      @Override
181      public boolean evaluate() throws Exception {
182        ClusterMetrics metrics = ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
183        Assert.assertNotNull(metrics);
184        return metrics.getRegionCount() > 0;
185      }
186    });
187    // Retrieve live servers and dead servers info.
188    EnumSet<Option> options =
189      EnumSet.of(Option.LIVE_SERVERS, Option.DEAD_SERVERS, Option.SERVERS_NAME);
190    ClusterMetrics metrics = ADMIN.getClusterMetrics(options);
191    Assert.assertNotNull(metrics);
192    // exclude a dead region server
193    Assert.assertEquals(SLAVES - 1, numRs);
194    // live servers = nums of regionservers
195    // By default, HMaster don't carry any regions so it won't report its load.
196    // Hence, it won't be in the server list.
197    Assert.assertEquals(numRs, metrics.getLiveServerMetrics().size());
198    Assert.assertTrue(metrics.getRegionCount() > 0);
199    Assert.assertNotNull(metrics.getDeadServerNames());
200    Assert.assertEquals(1, metrics.getDeadServerNames().size());
201    ServerName deadServerName = metrics.getDeadServerNames().iterator().next();
202    Assert.assertEquals(DEAD.getServerName(), deadServerName);
203    Assert.assertNotNull(metrics.getServersName());
204    Assert.assertEquals(numRs, metrics.getServersName().size());
205  }
206
207  @Test
208  public void testRegionStatesCount() throws Exception {
209    Table table = UTIL.createTable(TABLE_NAME, CF);
210    table.put(new Put(Bytes.toBytes("k1")).addColumn(CF, Bytes.toBytes("q1"), Bytes.toBytes("v1")));
211    table.put(new Put(Bytes.toBytes("k2")).addColumn(CF, Bytes.toBytes("q2"), Bytes.toBytes("v2")));
212    table.put(new Put(Bytes.toBytes("k3")).addColumn(CF, Bytes.toBytes("q3"), Bytes.toBytes("v3")));
213
214    ClusterMetrics metrics = ADMIN.getClusterMetrics();
215    Assert.assertEquals(metrics.getTableRegionStatesCount().size(), 3);
216    Assert.assertEquals(
217      metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME).getRegionsInTransition(),
218      0);
219    Assert.assertEquals(
220      metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME).getOpenRegions(), 1);
221    Assert.assertEquals(
222      metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME).getTotalRegions(), 1);
223    Assert.assertEquals(
224      metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME).getClosedRegions(), 0);
225    Assert.assertEquals(
226      metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME).getSplitRegions(), 0);
227    Assert.assertEquals(
228      metrics.getTableRegionStatesCount().get(TABLE_NAME).getRegionsInTransition(), 0);
229    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME).getOpenRegions(), 1);
230    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME).getTotalRegions(), 1);
231
232    UTIL.deleteTable(TABLE_NAME);
233  }
234
235  @Test
236  public void testMasterAndBackupMastersStatus() throws Exception {
237    // get all the master threads
238    List<MasterThread> masterThreads = CLUSTER.getMasterThreads();
239    int numActive = 0;
240    int activeIndex = 0;
241    ServerName activeName = null;
242    HMaster active = null;
243    for (int i = 0; i < masterThreads.size(); i++) {
244      if (masterThreads.get(i).getMaster().isActiveMaster()) {
245        numActive++;
246        activeIndex = i;
247        active = masterThreads.get(activeIndex).getMaster();
248        activeName = active.getServerName();
249      }
250    }
251    Assert.assertNotNull(active);
252    Assert.assertEquals(1, numActive);
253    Assert.assertEquals(MASTERS, masterThreads.size());
254    // Retrieve master and backup masters infos only.
255    EnumSet<Option> options = EnumSet.of(Option.MASTER, Option.BACKUP_MASTERS);
256    ClusterMetrics metrics = ADMIN.getClusterMetrics(options);
257    Assert.assertTrue(metrics.getMasterName().equals(activeName));
258    Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size());
259  }
260
261  @Test
262  public void testUserMetrics() throws Exception {
263    Configuration conf = UTIL.getConfiguration();
264    // If metrics for users is not enabled, this test doesn't make sense.
265    if (
266      !conf.getBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF,
267        MetricsUserAggregateFactory.DEFAULT_METRIC_USER_ENABLED_CONF)
268    ) {
269      return;
270    }
271    User userFoo = User.createUserForTesting(conf, "FOO_USER_METRIC_TEST", new String[0]);
272    User userBar = User.createUserForTesting(conf, "BAR_USER_METRIC_TEST", new String[0]);
273    User userTest = User.createUserForTesting(conf, "TEST_USER_METRIC_TEST", new String[0]);
274    UTIL.createTable(TABLE_NAME, CF);
275    waitForUsersMetrics(0);
276    long writeMetaMetricBeforeNextuser = getMetaMetrics().getWriteRequestCount();
277    userFoo.runAs(new PrivilegedAction<Void>() {
278      @Override
279      public Void run() {
280        try {
281          doPut();
282        } catch (IOException e) {
283          Assert.fail("Exception:" + e.getMessage());
284        }
285        return null;
286      }
287    });
288    waitForUsersMetrics(1);
289    long writeMetaMetricForUserFoo =
290      getMetaMetrics().getWriteRequestCount() - writeMetaMetricBeforeNextuser;
291    long readMetaMetricBeforeNextuser = getMetaMetrics().getReadRequestCount();
292    userBar.runAs(new PrivilegedAction<Void>() {
293      @Override
294      public Void run() {
295        try {
296          doGet();
297        } catch (IOException e) {
298          Assert.fail("Exception:" + e.getMessage());
299        }
300        return null;
301      }
302    });
303    waitForUsersMetrics(2);
304    long readMetaMetricForUserBar =
305      getMetaMetrics().getReadRequestCount() - readMetaMetricBeforeNextuser;
306    long filteredMetaReqeust = getMetaMetrics().getFilteredReadRequestCount();
307    userTest.runAs(new PrivilegedAction<Void>() {
308      @Override
309      public Void run() {
310        try {
311          Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
312          for (Result result : table.getScanner(new Scan().setFilter(new FilterAllFilter()))) {
313            Assert.fail("Should have filtered all rows");
314          }
315        } catch (IOException e) {
316          Assert.fail("Exception:" + e.getMessage());
317        }
318        return null;
319      }
320    });
321    waitForUsersMetrics(3);
322    long filteredMetaReqeustForTestUser =
323      getMetaMetrics().getFilteredReadRequestCount() - filteredMetaReqeust;
324    Map<byte[], UserMetrics> userMap = ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
325      .getLiveServerMetrics().values().iterator().next().getUserMetrics();
326    for (byte[] user : userMap.keySet()) {
327      switch (Bytes.toString(user)) {
328        case "FOO_USER_METRIC_TEST":
329          Assert.assertEquals(1,
330            userMap.get(user).getWriteRequestCount() - writeMetaMetricForUserFoo);
331          break;
332        case "BAR_USER_METRIC_TEST":
333          Assert.assertEquals(1,
334            userMap.get(user).getReadRequestCount() - readMetaMetricForUserBar);
335          Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
336          break;
337        case "TEST_USER_METRIC_TEST":
338          Assert.assertEquals(1,
339            userMap.get(user).getFilteredReadRequests() - filteredMetaReqeustForTestUser);
340          Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
341          break;
342        default:
343          // current user
344          Assert.assertEquals(UserProvider.instantiate(conf).getCurrent().getName(),
345            Bytes.toString(user));
346          // Read/write count because of Meta operations
347          Assert.assertTrue(userMap.get(user).getReadRequestCount() > 1);
348          break;
349      }
350    }
351    UTIL.deleteTable(TABLE_NAME);
352  }
353
354  @Test
355  public void testServerTasks() throws Exception {
356    // TaskMonitor is a singleton per VM, so will be shared among all minicluster "servers",
357    // so we only need to look at the first live server's results to find it.
358    final String testTaskName = "TEST TASK";
359    TaskMonitor.get().createStatus(testTaskName).setStatus("Testing 1... 2... 3...");
360    // Of course, first we must trigger regionserver reports.
361    final long now = EnvironmentEdgeManager.currentTime();
362    final long last = now - 1000; // fake a period, or someone might div by zero
363    for (RegionServerThread rs : CLUSTER.getRegionServerThreads()) {
364      ((MyRegionServer) rs.getRegionServer()).tryRegionServerReport(last, now);
365    }
366    // Get status now
367    ClusterMetrics clusterMetrics = ADMIN.getClusterMetrics(EnumSet.of(Option.TASKS));
368    // The test task will be in the master metrics list
369    boolean found = false;
370    for (ServerTask task : clusterMetrics.getMasterTasks()) {
371      if (testTaskName.equals(task.getDescription())) {
372        // Found it
373        found = true;
374        break;
375      }
376    }
377    Assert.assertTrue("Expected task not found in master task list", found);
378    // Get the tasks information (carried in server metrics)
379    found = false;
380    for (ServerMetrics serverMetrics : clusterMetrics.getLiveServerMetrics().values()) {
381      if (serverMetrics.getTasks() != null) {
382        for (ServerTask task : serverMetrics.getTasks()) {
383          if (testTaskName.equals(task.getDescription())) {
384            // Found it
385            found = true;
386            break;
387          }
388        }
389      }
390    }
391    // We will fall through here if getClusterMetrics(TASKS) did not correctly process the
392    // task list.
393    Assert.assertTrue("Expected task not found in server load", found);
394  }
395
396  private RegionMetrics getMetaMetrics() throws IOException {
397    for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
398      .getLiveServerMetrics().values()) {
399      RegionMetrics metaMetrics = serverMetrics.getRegionMetrics()
400        .get(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
401      if (metaMetrics != null) {
402        return metaMetrics;
403      }
404    }
405    Assert.fail("Should have find meta metrics");
406    return null;
407  }
408
409  private void waitForUsersMetrics(int noOfUsers) throws Exception {
410    // Sleep for metrics to get updated on master
411    Thread.sleep(5000);
412    Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() {
413      @Override
414      public boolean evaluate() throws Exception {
415        Map<byte[], UserMetrics> metrics = ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
416          .getLiveServerMetrics().values().iterator().next().getUserMetrics();
417        Assert.assertNotNull(metrics);
418        // including current user + noOfUsers
419        return metrics.keySet().size() > noOfUsers;
420      }
421    });
422  }
423
424  private void doPut() throws IOException {
425    try (Connection conn = createConnection(UTIL.getConfiguration())) {
426      Table table = conn.getTable(TABLE_NAME);
427      table
428        .put(new Put(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"), Bytes.toBytes("1")));
429    }
430  }
431
432  private void doGet() throws IOException {
433    try (Connection conn = createConnection(UTIL.getConfiguration())) {
434      Table table = conn.getTable(TABLE_NAME);
435      table.get(new Get(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1")));
436    }
437  }
438
439  private Connection createConnection(Configuration conf) throws IOException {
440    User user = UserProvider.instantiate(conf).getCurrent();
441    return ConnectionFactory.createConnection(conf, user);
442  }
443
444  @Test
445  public void testOtherStatusInfos() throws Exception {
446    EnumSet<Option> options = EnumSet.of(Option.MASTER_COPROCESSORS, Option.HBASE_VERSION,
447      Option.CLUSTER_ID, Option.BALANCER_ON);
448    ClusterMetrics metrics = ADMIN.getClusterMetrics(options);
449    Assert.assertEquals(1, metrics.getMasterCoprocessorNames().size());
450    Assert.assertNotNull(metrics.getHBaseVersion());
451    Assert.assertNotNull(metrics.getClusterId());
452    Assert.assertTrue(metrics.getAverageLoad() == 0.0);
453    Assert.assertNotNull(metrics.getBalancerOn());
454  }
455
456  @AfterClass
457  public static void tearDownAfterClass() throws Exception {
458    if (ADMIN != null) {
459      ADMIN.close();
460    }
461    UTIL.shutdownMiniCluster();
462  }
463
464  @Test
465  public void testObserver() throws IOException {
466    int preCount = MyObserver.PRE_COUNT.get();
467    int postCount = MyObserver.POST_COUNT.get();
468    Assert.assertTrue(ADMIN.getClusterMetrics().getMasterCoprocessorNames().stream()
469      .anyMatch(s -> s.equals(MyObserver.class.getSimpleName())));
470    Assert.assertEquals(preCount + 1, MyObserver.PRE_COUNT.get());
471    Assert.assertEquals(postCount + 1, MyObserver.POST_COUNT.get());
472  }
473
474  private static void insertData(final TableName tableName, int startRow, int rowCount)
475    throws IOException {
476    Table t = UTIL.getConnection().getTable(tableName);
477    Put p;
478    for (int i = 0; i < rowCount; i++) {
479      p = new Put(Bytes.toBytes("" + (startRow + i)));
480      p.addColumn(CF, Bytes.toBytes("val1"), Bytes.toBytes(i));
481      t.put(p);
482    }
483  }
484
485  public static class MyObserver implements MasterCoprocessor, MasterObserver {
486    private static final AtomicInteger PRE_COUNT = new AtomicInteger(0);
487    private static final AtomicInteger POST_COUNT = new AtomicInteger(0);
488
489    @Override
490    public Optional<MasterObserver> getMasterObserver() {
491      return Optional.of(this);
492    }
493
494    @Override
495    public void preGetClusterMetrics(ObserverContext<MasterCoprocessorEnvironment> ctx)
496      throws IOException {
497      PRE_COUNT.incrementAndGet();
498    }
499
500    @Override
501    public void postGetClusterMetrics(ObserverContext<MasterCoprocessorEnvironment> ctx,
502      ClusterMetrics metrics) throws IOException {
503      POST_COUNT.incrementAndGet();
504    }
505  }
506}