001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.security.PrivilegedAction;
022import java.util.EnumSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.atomic.AtomicInteger;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ClusterMetrics.Option;
031import org.apache.hadoop.hbase.Waiter.Predicate;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.AsyncAdmin;
034import org.apache.hadoop.hbase.client.AsyncConnection;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.RegionInfoBuilder;
040import org.apache.hadoop.hbase.client.RegionStatesCount;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
045import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.MasterObserver;
048import org.apache.hadoop.hbase.coprocessor.ObserverContext;
049import org.apache.hadoop.hbase.filter.FilterAllFilter;
050import org.apache.hadoop.hbase.master.HMaster;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.regionserver.MetricsUserAggregateFactory;
053import org.apache.hadoop.hbase.security.User;
054import org.apache.hadoop.hbase.security.UserProvider;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
058import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
059import org.junit.AfterClass;
060import org.junit.Assert;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065
066@Category(MediumTests.class)
067public class TestClientClusterMetrics {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071      HBaseClassTestRule.forClass(TestClientClusterMetrics.class);
072
073  private static HBaseTestingUtility UTIL;
074  private static Admin ADMIN;
075  private final static int SLAVES = 5;
076  private final static int MASTERS = 3;
077  private static MiniHBaseCluster CLUSTER;
078  private static HRegionServer DEAD;
079  private static final TableName TABLE_NAME = TableName.valueOf("test");
080  private static final byte[] CF = Bytes.toBytes("cf");
081
082
083  @BeforeClass
084  public static void setUpBeforeClass() throws Exception {
085    Configuration conf = HBaseConfiguration.create();
086    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
087    UTIL = new HBaseTestingUtility(conf);
088    StartMiniClusterOption option = StartMiniClusterOption.builder()
089        .numMasters(MASTERS).numRegionServers(SLAVES).numDataNodes(SLAVES).build();
090    UTIL.startMiniCluster(option);
091    CLUSTER = UTIL.getHBaseCluster();
092    CLUSTER.waitForActiveAndReadyMaster();
093    ADMIN = UTIL.getAdmin();
094    // Kill one region server
095    List<RegionServerThread> rsts = CLUSTER.getLiveRegionServerThreads();
096    RegionServerThread rst = rsts.get(rsts.size() - 1);
097    DEAD = rst.getRegionServer();
098    DEAD.stop("Test dead servers metrics");
099    while (rst.isAlive()) {
100      Thread.sleep(500);
101    }
102  }
103
104  @Test
105  public void testDefaults() throws Exception {
106    ClusterMetrics origin = ADMIN.getClusterMetrics();
107    ClusterMetrics defaults = ADMIN.getClusterMetrics(EnumSet.allOf(Option.class));
108    Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
109    Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
110    Assert.assertEquals(origin.getAverageLoad(), defaults.getAverageLoad(), 0);
111    Assert.assertEquals(origin.getBackupMasterNames().size(),
112        defaults.getBackupMasterNames().size());
113    Assert.assertEquals(origin.getDeadServerNames().size(), defaults.getDeadServerNames().size());
114    Assert.assertEquals(origin.getRegionCount(), defaults.getRegionCount());
115    Assert.assertEquals(origin.getLiveServerMetrics().size(),
116        defaults.getLiveServerMetrics().size());
117    Assert.assertEquals(origin.getMasterInfoPort(), defaults.getMasterInfoPort());
118    Assert.assertEquals(origin.getServersName().size(), defaults.getServersName().size());
119    Assert.assertEquals(ADMIN.getRegionServers().size(), defaults.getServersName().size());
120  }
121
122  @Test
123  public void testAsyncClient() throws Exception {
124    try (AsyncConnection asyncConnect = ConnectionFactory.createAsyncConnection(
125      UTIL.getConfiguration()).get()) {
126      AsyncAdmin asyncAdmin = asyncConnect.getAdmin();
127      CompletableFuture<ClusterMetrics> originFuture =
128        asyncAdmin.getClusterMetrics();
129      CompletableFuture<ClusterMetrics> defaultsFuture =
130        asyncAdmin.getClusterMetrics(EnumSet.allOf(Option.class));
131      ClusterMetrics origin = originFuture.get();
132      ClusterMetrics defaults = defaultsFuture.get();
133      Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
134      Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
135      Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
136      Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
137      Assert.assertEquals(origin.getAverageLoad(), defaults.getAverageLoad(), 0);
138      Assert.assertEquals(origin.getBackupMasterNames().size(),
139        defaults.getBackupMasterNames().size());
140      Assert.assertEquals(origin.getDeadServerNames().size(), defaults.getDeadServerNames().size());
141      Assert.assertEquals(origin.getRegionCount(), defaults.getRegionCount());
142      Assert.assertEquals(origin.getLiveServerMetrics().size(),
143        defaults.getLiveServerMetrics().size());
144      Assert.assertEquals(origin.getMasterInfoPort(), defaults.getMasterInfoPort());
145      Assert.assertEquals(origin.getServersName().size(), defaults.getServersName().size());
146      origin.getTableRegionStatesCount().forEach(((tableName, regionStatesCount) -> {
147        RegionStatesCount defaultRegionStatesCount = defaults.getTableRegionStatesCount()
148          .get(tableName);
149        Assert.assertEquals(defaultRegionStatesCount, regionStatesCount);
150      }));
151    }
152  }
153
154  @Test
155  public void testLiveAndDeadServersStatus() throws Exception {
156    // Count the number of live regionservers
157    List<RegionServerThread> regionserverThreads = CLUSTER.getLiveRegionServerThreads();
158    int numRs = 0;
159    int len = regionserverThreads.size();
160    for (int i = 0; i < len; i++) {
161      if (regionserverThreads.get(i).isAlive()) {
162        numRs++;
163      }
164    }
165    // Depending on the (random) order of unit execution we may run this unit before the
166    // minicluster is fully up and recovered from the RS shutdown done during test init.
167    Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() {
168      @Override
169      public boolean evaluate() throws Exception {
170        ClusterMetrics metrics = ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
171        Assert.assertNotNull(metrics);
172        return metrics.getRegionCount() > 0;
173      }
174    });
175    // Retrieve live servers and dead servers info.
176    EnumSet<Option> options =
177        EnumSet.of(Option.LIVE_SERVERS, Option.DEAD_SERVERS, Option.SERVERS_NAME);
178    ClusterMetrics metrics = ADMIN.getClusterMetrics(options);
179    Assert.assertNotNull(metrics);
180    // exclude a dead region server
181    Assert.assertEquals(SLAVES -1, numRs);
182    // live servers = nums of regionservers
183    // By default, HMaster don't carry any regions so it won't report its load.
184    // Hence, it won't be in the server list.
185    Assert.assertEquals(numRs, metrics.getLiveServerMetrics().size());
186    Assert.assertTrue(metrics.getRegionCount() > 0);
187    Assert.assertNotNull(metrics.getDeadServerNames());
188    Assert.assertEquals(1, metrics.getDeadServerNames().size());
189    ServerName deadServerName = metrics.getDeadServerNames().iterator().next();
190    Assert.assertEquals(DEAD.getServerName(), deadServerName);
191    Assert.assertNotNull(metrics.getServersName());
192    Assert.assertEquals(numRs, metrics.getServersName().size());
193  }
194
195  @Test
196  public void testRegionStatesCount() throws Exception {
197    Table table = UTIL.createTable(TABLE_NAME, CF);
198    table.put(new Put(Bytes.toBytes("k1"))
199      .addColumn(CF, Bytes.toBytes("q1"), Bytes.toBytes("v1")));
200    table.put(new Put(Bytes.toBytes("k2"))
201      .addColumn(CF, Bytes.toBytes("q2"), Bytes.toBytes("v2")));
202    table.put(new Put(Bytes.toBytes("k3"))
203      .addColumn(CF, Bytes.toBytes("q3"), Bytes.toBytes("v3")));
204
205    ClusterMetrics metrics = ADMIN.getClusterMetrics();
206    Assert.assertEquals(metrics.getTableRegionStatesCount().size(), 3);
207    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
208      .getRegionsInTransition(), 0);
209    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
210      .getOpenRegions(), 1);
211    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
212      .getTotalRegions(), 1);
213    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
214      .getClosedRegions(), 0);
215    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
216      .getSplitRegions(), 0);
217    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
218      .getRegionsInTransition(), 0);
219    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
220      .getOpenRegions(), 1);
221    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
222      .getTotalRegions(), 1);
223
224    UTIL.deleteTable(TABLE_NAME);
225  }
226
227  @Test
228  public void testMasterAndBackupMastersStatus() throws Exception {
229    // get all the master threads
230    List<MasterThread> masterThreads = CLUSTER.getMasterThreads();
231    int numActive = 0;
232    int activeIndex = 0;
233    ServerName activeName = null;
234    HMaster active = null;
235    for (int i = 0; i < masterThreads.size(); i++) {
236      if (masterThreads.get(i).getMaster().isActiveMaster()) {
237        numActive++;
238        activeIndex = i;
239        active = masterThreads.get(activeIndex).getMaster();
240        activeName = active.getServerName();
241      }
242    }
243    Assert.assertNotNull(active);
244    Assert.assertEquals(1, numActive);
245    Assert.assertEquals(MASTERS, masterThreads.size());
246    // Retrieve master and backup masters infos only.
247    EnumSet<Option> options = EnumSet.of(Option.MASTER, Option.BACKUP_MASTERS);
248    ClusterMetrics metrics = ADMIN.getClusterMetrics(options);
249    Assert.assertTrue(metrics.getMasterName().equals(activeName));
250    Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size());
251  }
252
253  @Test public void testUserMetrics() throws Exception {
254    Configuration conf = UTIL.getConfiguration();
255    // If metrics for users is not enabled, this test doesn't  make sense.
256    if (!conf.getBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF,
257        MetricsUserAggregateFactory.DEFAULT_METRIC_USER_ENABLED_CONF)) {
258      return;
259    }
260    User userFoo = User.createUserForTesting(conf, "FOO_USER_METRIC_TEST", new String[0]);
261    User userBar = User.createUserForTesting(conf, "BAR_USER_METRIC_TEST", new String[0]);
262    User userTest = User.createUserForTesting(conf, "TEST_USER_METRIC_TEST", new String[0]);
263    UTIL.createTable(TABLE_NAME, CF);
264    waitForUsersMetrics(0);
265    long writeMetaMetricBeforeNextuser = getMetaMetrics().getWriteRequestCount();
266    userFoo.runAs(new PrivilegedAction<Void>() {
267      @Override public Void run() {
268        try {
269          doPut();
270        } catch (IOException e) {
271          Assert.fail("Exception:" + e.getMessage());
272        }
273        return null;
274      }
275    });
276    waitForUsersMetrics(1);
277    long writeMetaMetricForUserFoo =
278        getMetaMetrics().getWriteRequestCount() - writeMetaMetricBeforeNextuser;
279    long readMetaMetricBeforeNextuser = getMetaMetrics().getReadRequestCount();
280    userBar.runAs(new PrivilegedAction<Void>() {
281      @Override public Void run() {
282        try {
283          doGet();
284        } catch (IOException e) {
285          Assert.fail("Exception:" + e.getMessage());
286        }
287        return null;
288      }
289    });
290    waitForUsersMetrics(2);
291    long readMetaMetricForUserBar =
292        getMetaMetrics().getReadRequestCount() - readMetaMetricBeforeNextuser;
293    long filteredMetaReqeust = getMetaMetrics().getFilteredReadRequestCount();
294    userTest.runAs(new PrivilegedAction<Void>() {
295      @Override public Void run() {
296        try {
297          Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
298          for (Result result : table.getScanner(new Scan().setFilter(new FilterAllFilter()))) {
299            Assert.fail("Should have filtered all rows");
300          }
301        } catch (IOException e) {
302          Assert.fail("Exception:" + e.getMessage());
303        }
304        return null;
305      }
306    });
307    waitForUsersMetrics(3);
308    long filteredMetaReqeustForTestUser =
309        getMetaMetrics().getFilteredReadRequestCount() - filteredMetaReqeust;
310    Map<byte[], UserMetrics> userMap =
311        ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
312            .iterator().next().getUserMetrics();
313    for (byte[] user : userMap.keySet()) {
314      switch (Bytes.toString(user)) {
315        case "FOO_USER_METRIC_TEST":
316          Assert.assertEquals(1,
317              userMap.get(user).getWriteRequestCount() - writeMetaMetricForUserFoo);
318          break;
319        case "BAR_USER_METRIC_TEST":
320          Assert
321              .assertEquals(1, userMap.get(user).getReadRequestCount() - readMetaMetricForUserBar);
322          Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
323          break;
324        case "TEST_USER_METRIC_TEST":
325          Assert.assertEquals(1,
326              userMap.get(user).getFilteredReadRequests() - filteredMetaReqeustForTestUser);
327          Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
328          break;
329        default:
330          //current user
331          Assert.assertEquals(UserProvider.instantiate(conf).getCurrent().getName(),
332              Bytes.toString(user));
333          //Read/write count because of Meta operations
334          Assert.assertTrue(userMap.get(user).getReadRequestCount() > 1);
335          break;
336      }
337    }
338    UTIL.deleteTable(TABLE_NAME);
339  }
340
341  private RegionMetrics getMetaMetrics() throws IOException {
342    for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
343        .getLiveServerMetrics().values()) {
344      RegionMetrics metaMetrics = serverMetrics.getRegionMetrics()
345          .get(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
346      if (metaMetrics != null) {
347        return metaMetrics;
348      }
349    }
350    Assert.fail("Should have find meta metrics");
351    return null;
352  }
353
354  private void waitForUsersMetrics(int noOfUsers) throws Exception {
355    //Sleep for metrics to get updated on master
356    Thread.sleep(5000);
357    Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() {
358      @Override public boolean evaluate() throws Exception {
359        Map<byte[], UserMetrics> metrics =
360            ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
361                .iterator().next().getUserMetrics();
362        Assert.assertNotNull(metrics);
363        //including current user + noOfUsers
364        return metrics.keySet().size() > noOfUsers;
365      }
366    });
367  }
368
369  private void doPut() throws IOException {
370    try (Connection conn = createConnection(UTIL.getConfiguration())) {
371      Table table = conn.getTable(TABLE_NAME);
372      table
373        .put(new Put(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"), Bytes.toBytes("1")));
374    }
375  }
376
377  private void doGet() throws IOException {
378    try (Connection conn = createConnection(UTIL.getConfiguration())) {
379      Table table = conn.getTable(TABLE_NAME);
380      table.get(new Get(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1")));
381    }
382  }
383
384  private Connection createConnection(Configuration conf) throws IOException {
385    User user = UserProvider.instantiate(conf).getCurrent();
386    return ConnectionFactory.createConnection(conf, user);
387  }
388
389  @Test
390  public void testOtherStatusInfos() throws Exception {
391    EnumSet<Option> options =
392        EnumSet.of(Option.MASTER_COPROCESSORS, Option.HBASE_VERSION,
393                   Option.CLUSTER_ID, Option.BALANCER_ON);
394    ClusterMetrics metrics = ADMIN.getClusterMetrics(options);
395    Assert.assertEquals(1, metrics.getMasterCoprocessorNames().size());
396    Assert.assertNotNull(metrics.getHBaseVersion());
397    Assert.assertNotNull(metrics.getClusterId());
398    Assert.assertTrue(metrics.getAverageLoad() == 0.0);
399    Assert.assertNotNull(metrics.getBalancerOn());
400  }
401
402  @AfterClass
403  public static void tearDownAfterClass() throws Exception {
404    if (ADMIN != null) {
405      ADMIN.close();
406    }
407    UTIL.shutdownMiniCluster();
408  }
409
410  @Test
411  public void testObserver() throws IOException {
412    int preCount = MyObserver.PRE_COUNT.get();
413    int postCount = MyObserver.POST_COUNT.get();
414    Assert.assertTrue(ADMIN.getClusterMetrics().getMasterCoprocessorNames().stream()
415        .anyMatch(s -> s.equals(MyObserver.class.getSimpleName())));
416    Assert.assertEquals(preCount + 1, MyObserver.PRE_COUNT.get());
417    Assert.assertEquals(postCount + 1, MyObserver.POST_COUNT.get());
418  }
419
420  private static void insertData(final TableName tableName, int startRow, int rowCount)
421      throws IOException {
422    Table t = UTIL.getConnection().getTable(tableName);
423    Put p;
424    for (int i = 0; i < rowCount; i++) {
425      p = new Put(Bytes.toBytes("" + (startRow + i)));
426      p.addColumn(CF, Bytes.toBytes("val1"), Bytes.toBytes(i));
427      t.put(p);
428    }
429  }
430
431  public static class MyObserver implements MasterCoprocessor, MasterObserver {
432    private static final AtomicInteger PRE_COUNT = new AtomicInteger(0);
433    private static final AtomicInteger POST_COUNT = new AtomicInteger(0);
434
435    @Override public Optional<MasterObserver> getMasterObserver() {
436      return Optional.of(this);
437    }
438
439    @Override public void preGetClusterMetrics(ObserverContext<MasterCoprocessorEnvironment> ctx)
440        throws IOException {
441      PRE_COUNT.incrementAndGet();
442    }
443
444    @Override public void postGetClusterMetrics(ObserverContext<MasterCoprocessorEnvironment> ctx,
445        ClusterMetrics metrics) throws IOException {
446      POST_COUNT.incrementAndGet();
447    }
448  }
449}