001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.security.PrivilegedAction;
022import java.util.EnumSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.atomic.AtomicInteger;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ClusterMetrics.Option;
031import org.apache.hadoop.hbase.Waiter.Predicate;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.AsyncAdmin;
034import org.apache.hadoop.hbase.client.AsyncConnection;
035import org.apache.hadoop.hbase.client.Connection;
036import org.apache.hadoop.hbase.client.ConnectionFactory;
037import org.apache.hadoop.hbase.client.Get;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.RegionInfoBuilder;
040import org.apache.hadoop.hbase.client.RegionStatesCount;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
045import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
047import org.apache.hadoop.hbase.coprocessor.MasterObserver;
048import org.apache.hadoop.hbase.coprocessor.ObserverContext;
049import org.apache.hadoop.hbase.filter.FilterAllFilter;
050import org.apache.hadoop.hbase.master.HMaster;
051import org.apache.hadoop.hbase.regionserver.HRegionServer;
052import org.apache.hadoop.hbase.security.User;
053import org.apache.hadoop.hbase.security.UserProvider;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
057import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
058import org.junit.AfterClass;
059import org.junit.Assert;
060import org.junit.BeforeClass;
061import org.junit.ClassRule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064
065@Category(MediumTests.class)
066public class TestClientClusterMetrics {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070      HBaseClassTestRule.forClass(TestClientClusterMetrics.class);
071
072  private static HBaseTestingUtility UTIL;
073  private static Admin ADMIN;
074  private final static int SLAVES = 5;
075  private final static int MASTERS = 3;
076  private static MiniHBaseCluster CLUSTER;
077  private static HRegionServer DEAD;
078  private static final TableName TABLE_NAME = TableName.valueOf("test");
079  private static final byte[] CF = Bytes.toBytes("cf");
080
081
082  @BeforeClass
083  public static void setUpBeforeClass() throws Exception {
084    Configuration conf = HBaseConfiguration.create();
085    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
086    UTIL = new HBaseTestingUtility(conf);
087    StartMiniClusterOption option = StartMiniClusterOption.builder()
088        .numMasters(MASTERS).numRegionServers(SLAVES).numDataNodes(SLAVES).build();
089    UTIL.startMiniCluster(option);
090    CLUSTER = UTIL.getHBaseCluster();
091    CLUSTER.waitForActiveAndReadyMaster();
092    ADMIN = UTIL.getAdmin();
093    // Kill one region server
094    List<RegionServerThread> rsts = CLUSTER.getLiveRegionServerThreads();
095    RegionServerThread rst = rsts.get(rsts.size() - 1);
096    DEAD = rst.getRegionServer();
097    DEAD.stop("Test dead servers metrics");
098    while (rst.isAlive()) {
099      Thread.sleep(500);
100    }
101  }
102
103  @Test
104  public void testDefaults() throws Exception {
105    ClusterMetrics origin = ADMIN.getClusterMetrics();
106    ClusterMetrics defaults = ADMIN.getClusterMetrics(EnumSet.allOf(Option.class));
107    Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
108    Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
109    Assert.assertEquals(origin.getAverageLoad(), defaults.getAverageLoad(), 0);
110    Assert.assertEquals(origin.getBackupMasterNames().size(),
111        defaults.getBackupMasterNames().size());
112    Assert.assertEquals(origin.getDeadServerNames().size(), defaults.getDeadServerNames().size());
113    Assert.assertEquals(origin.getRegionCount(), defaults.getRegionCount());
114    Assert.assertEquals(origin.getLiveServerMetrics().size(),
115        defaults.getLiveServerMetrics().size());
116    Assert.assertEquals(origin.getMasterInfoPort(), defaults.getMasterInfoPort());
117    Assert.assertEquals(origin.getServersName().size(), defaults.getServersName().size());
118    Assert.assertEquals(ADMIN.getRegionServers().size(), defaults.getServersName().size());
119  }
120
121  @Test
122  public void testAsyncClient() throws Exception {
123    try (AsyncConnection asyncConnect = ConnectionFactory.createAsyncConnection(
124      UTIL.getConfiguration()).get()) {
125      AsyncAdmin asyncAdmin = asyncConnect.getAdmin();
126      CompletableFuture<ClusterMetrics> originFuture =
127        asyncAdmin.getClusterMetrics();
128      CompletableFuture<ClusterMetrics> defaultsFuture =
129        asyncAdmin.getClusterMetrics(EnumSet.allOf(Option.class));
130      ClusterMetrics origin = originFuture.get();
131      ClusterMetrics defaults = defaultsFuture.get();
132      Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
133      Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
134      Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
135      Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
136      Assert.assertEquals(origin.getAverageLoad(), defaults.getAverageLoad(), 0);
137      Assert.assertEquals(origin.getBackupMasterNames().size(),
138        defaults.getBackupMasterNames().size());
139      Assert.assertEquals(origin.getDeadServerNames().size(), defaults.getDeadServerNames().size());
140      Assert.assertEquals(origin.getRegionCount(), defaults.getRegionCount());
141      Assert.assertEquals(origin.getLiveServerMetrics().size(),
142        defaults.getLiveServerMetrics().size());
143      Assert.assertEquals(origin.getMasterInfoPort(), defaults.getMasterInfoPort());
144      Assert.assertEquals(origin.getServersName().size(), defaults.getServersName().size());
145      origin.getTableRegionStatesCount().forEach(((tableName, regionStatesCount) -> {
146        RegionStatesCount defaultRegionStatesCount = defaults.getTableRegionStatesCount()
147          .get(tableName);
148        Assert.assertEquals(defaultRegionStatesCount, regionStatesCount);
149      }));
150    }
151  }
152
153  @Test
154  public void testLiveAndDeadServersStatus() throws Exception {
155    // Count the number of live regionservers
156    List<RegionServerThread> regionserverThreads = CLUSTER.getLiveRegionServerThreads();
157    int numRs = 0;
158    int len = regionserverThreads.size();
159    for (int i = 0; i < len; i++) {
160      if (regionserverThreads.get(i).isAlive()) {
161        numRs++;
162      }
163    }
164    // Depending on the (random) order of unit execution we may run this unit before the
165    // minicluster is fully up and recovered from the RS shutdown done during test init.
166    Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() {
167      @Override
168      public boolean evaluate() throws Exception {
169        ClusterMetrics metrics = ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
170        Assert.assertNotNull(metrics);
171        return metrics.getRegionCount() > 0;
172      }
173    });
174    // Retrieve live servers and dead servers info.
175    EnumSet<Option> options =
176        EnumSet.of(Option.LIVE_SERVERS, Option.DEAD_SERVERS, Option.SERVERS_NAME);
177    ClusterMetrics metrics = ADMIN.getClusterMetrics(options);
178    Assert.assertNotNull(metrics);
179    // exclude a dead region server
180    Assert.assertEquals(SLAVES -1, numRs);
181    // live servers = nums of regionservers
182    // By default, HMaster don't carry any regions so it won't report its load.
183    // Hence, it won't be in the server list.
184    Assert.assertEquals(numRs, metrics.getLiveServerMetrics().size());
185    Assert.assertTrue(metrics.getRegionCount() > 0);
186    Assert.assertNotNull(metrics.getDeadServerNames());
187    Assert.assertEquals(1, metrics.getDeadServerNames().size());
188    ServerName deadServerName = metrics.getDeadServerNames().iterator().next();
189    Assert.assertEquals(DEAD.getServerName(), deadServerName);
190    Assert.assertNotNull(metrics.getServersName());
191    Assert.assertEquals(numRs, metrics.getServersName().size());
192  }
193
194  @Test
195  public void testRegionStatesCount() throws Exception {
196    Table table = UTIL.createTable(TABLE_NAME, CF);
197    table.put(new Put(Bytes.toBytes("k1"))
198      .addColumn(CF, Bytes.toBytes("q1"), Bytes.toBytes("v1")));
199    table.put(new Put(Bytes.toBytes("k2"))
200      .addColumn(CF, Bytes.toBytes("q2"), Bytes.toBytes("v2")));
201    table.put(new Put(Bytes.toBytes("k3"))
202      .addColumn(CF, Bytes.toBytes("q3"), Bytes.toBytes("v3")));
203
204    ClusterMetrics metrics = ADMIN.getClusterMetrics();
205    Assert.assertEquals(metrics.getTableRegionStatesCount().size(), 3);
206    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
207      .getRegionsInTransition(), 0);
208    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
209      .getOpenRegions(), 1);
210    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
211      .getTotalRegions(), 1);
212    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
213      .getClosedRegions(), 0);
214    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
215      .getSplitRegions(), 0);
216    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
217      .getRegionsInTransition(), 0);
218    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
219      .getOpenRegions(), 1);
220    Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
221      .getTotalRegions(), 1);
222
223    UTIL.deleteTable(TABLE_NAME);
224  }
225
226  @Test
227  public void testMasterAndBackupMastersStatus() throws Exception {
228    // get all the master threads
229    List<MasterThread> masterThreads = CLUSTER.getMasterThreads();
230    int numActive = 0;
231    int activeIndex = 0;
232    ServerName activeName = null;
233    HMaster active = null;
234    for (int i = 0; i < masterThreads.size(); i++) {
235      if (masterThreads.get(i).getMaster().isActiveMaster()) {
236        numActive++;
237        activeIndex = i;
238        active = masterThreads.get(activeIndex).getMaster();
239        activeName = active.getServerName();
240      }
241    }
242    Assert.assertNotNull(active);
243    Assert.assertEquals(1, numActive);
244    Assert.assertEquals(MASTERS, masterThreads.size());
245    // Retrieve master and backup masters infos only.
246    EnumSet<Option> options = EnumSet.of(Option.MASTER, Option.BACKUP_MASTERS);
247    ClusterMetrics metrics = ADMIN.getClusterMetrics(options);
248    Assert.assertTrue(metrics.getMasterName().equals(activeName));
249    Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size());
250  }
251
252  @Test public void testUserMetrics() throws Exception {
253    Configuration conf = UTIL.getConfiguration();
254    User userFoo = User.createUserForTesting(conf, "FOO_USER_METRIC_TEST", new String[0]);
255    User userBar = User.createUserForTesting(conf, "BAR_USER_METRIC_TEST", new String[0]);
256    User userTest = User.createUserForTesting(conf, "TEST_USER_METRIC_TEST", new String[0]);
257    UTIL.createTable(TABLE_NAME, CF);
258    waitForUsersMetrics(0);
259    long writeMetaMetricBeforeNextuser = getMetaMetrics().getWriteRequestCount();
260    userFoo.runAs(new PrivilegedAction<Void>() {
261      @Override public Void run() {
262        try {
263          doPut();
264        } catch (IOException e) {
265          Assert.fail("Exception:" + e.getMessage());
266        }
267        return null;
268      }
269    });
270    waitForUsersMetrics(1);
271    long writeMetaMetricForUserFoo =
272        getMetaMetrics().getWriteRequestCount() - writeMetaMetricBeforeNextuser;
273    long readMetaMetricBeforeNextuser = getMetaMetrics().getReadRequestCount();
274    userBar.runAs(new PrivilegedAction<Void>() {
275      @Override public Void run() {
276        try {
277          doGet();
278        } catch (IOException e) {
279          Assert.fail("Exception:" + e.getMessage());
280        }
281        return null;
282      }
283    });
284    waitForUsersMetrics(2);
285    long readMetaMetricForUserBar =
286        getMetaMetrics().getReadRequestCount() - readMetaMetricBeforeNextuser;
287    long filteredMetaReqeust = getMetaMetrics().getFilteredReadRequestCount();
288    userTest.runAs(new PrivilegedAction<Void>() {
289      @Override public Void run() {
290        try {
291          Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
292          for (Result result : table.getScanner(new Scan().setFilter(new FilterAllFilter()))) {
293            Assert.fail("Should have filtered all rows");
294          }
295        } catch (IOException e) {
296          Assert.fail("Exception:" + e.getMessage());
297        }
298        return null;
299      }
300    });
301    waitForUsersMetrics(3);
302    long filteredMetaReqeustForTestUser =
303        getMetaMetrics().getFilteredReadRequestCount() - filteredMetaReqeust;
304    Map<byte[], UserMetrics> userMap =
305        ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
306            .iterator().next().getUserMetrics();
307    for (byte[] user : userMap.keySet()) {
308      switch (Bytes.toString(user)) {
309        case "FOO_USER_METRIC_TEST":
310          Assert.assertEquals(1,
311              userMap.get(user).getWriteRequestCount() - writeMetaMetricForUserFoo);
312          break;
313        case "BAR_USER_METRIC_TEST":
314          Assert
315              .assertEquals(1, userMap.get(user).getReadRequestCount() - readMetaMetricForUserBar);
316          Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
317          break;
318        case "TEST_USER_METRIC_TEST":
319          Assert.assertEquals(1,
320              userMap.get(user).getFilteredReadRequests() - filteredMetaReqeustForTestUser);
321          Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
322          break;
323        default:
324          //current user
325          Assert.assertEquals(UserProvider.instantiate(conf).getCurrent().getName(),
326              Bytes.toString(user));
327          //Read/write count because of Meta operations
328          Assert.assertTrue(userMap.get(user).getReadRequestCount() > 1);
329          break;
330      }
331    }
332    UTIL.deleteTable(TABLE_NAME);
333  }
334
335  private RegionMetrics getMetaMetrics() throws IOException {
336    for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
337        .getLiveServerMetrics().values()) {
338      RegionMetrics metaMetrics = serverMetrics.getRegionMetrics()
339          .get(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
340      if (metaMetrics != null) {
341        return metaMetrics;
342      }
343    }
344    Assert.fail("Should have find meta metrics");
345    return null;
346  }
347
348  private void waitForUsersMetrics(int noOfUsers) throws Exception {
349    //Sleep for metrics to get updated on master
350    Thread.sleep(5000);
351    Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() {
352      @Override public boolean evaluate() throws Exception {
353        Map<byte[], UserMetrics> metrics =
354            ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
355                .iterator().next().getUserMetrics();
356        Assert.assertNotNull(metrics);
357        //including current user + noOfUsers
358        return metrics.keySet().size() > noOfUsers;
359      }
360    });
361  }
362
363  private void doPut() throws IOException {
364    try (Connection conn = createConnection(UTIL.getConfiguration())) {
365      Table table = conn.getTable(TABLE_NAME);
366      table
367        .put(new Put(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"), Bytes.toBytes("1")));
368    }
369  }
370
371  private void doGet() throws IOException {
372    try (Connection conn = createConnection(UTIL.getConfiguration())) {
373      Table table = conn.getTable(TABLE_NAME);
374      table.get(new Get(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1")));
375    }
376  }
377
378  private Connection createConnection(Configuration conf) throws IOException {
379    User user = UserProvider.instantiate(conf).getCurrent();
380    return ConnectionFactory.createConnection(conf, user);
381  }
382
383  @Test
384  public void testOtherStatusInfos() throws Exception {
385    EnumSet<Option> options =
386        EnumSet.of(Option.MASTER_COPROCESSORS, Option.HBASE_VERSION,
387                   Option.CLUSTER_ID, Option.BALANCER_ON);
388    ClusterMetrics metrics = ADMIN.getClusterMetrics(options);
389    Assert.assertEquals(1, metrics.getMasterCoprocessorNames().size());
390    Assert.assertNotNull(metrics.getHBaseVersion());
391    Assert.assertNotNull(metrics.getClusterId());
392    Assert.assertTrue(metrics.getAverageLoad() == 0.0);
393    Assert.assertNotNull(metrics.getBalancerOn());
394  }
395
396  @AfterClass
397  public static void tearDownAfterClass() throws Exception {
398    if (ADMIN != null) {
399      ADMIN.close();
400    }
401    UTIL.shutdownMiniCluster();
402  }
403
404  @Test
405  public void testObserver() throws IOException {
406    int preCount = MyObserver.PRE_COUNT.get();
407    int postCount = MyObserver.POST_COUNT.get();
408    Assert.assertTrue(ADMIN.getClusterMetrics().getMasterCoprocessorNames().stream()
409        .anyMatch(s -> s.equals(MyObserver.class.getSimpleName())));
410    Assert.assertEquals(preCount + 1, MyObserver.PRE_COUNT.get());
411    Assert.assertEquals(postCount + 1, MyObserver.POST_COUNT.get());
412  }
413
414  private static void insertData(final TableName tableName, int startRow, int rowCount)
415      throws IOException {
416    Table t = UTIL.getConnection().getTable(tableName);
417    Put p;
418    for (int i = 0; i < rowCount; i++) {
419      p = new Put(Bytes.toBytes("" + (startRow + i)));
420      p.addColumn(CF, Bytes.toBytes("val1"), Bytes.toBytes(i));
421      t.put(p);
422    }
423  }
424
425  public static class MyObserver implements MasterCoprocessor, MasterObserver {
426    private static final AtomicInteger PRE_COUNT = new AtomicInteger(0);
427    private static final AtomicInteger POST_COUNT = new AtomicInteger(0);
428
429    @Override public Optional<MasterObserver> getMasterObserver() {
430      return Optional.of(this);
431    }
432
433    @Override public void preGetClusterMetrics(ObserverContext<MasterCoprocessorEnvironment> ctx)
434        throws IOException {
435      PRE_COUNT.incrementAndGet();
436    }
437
438    @Override public void postGetClusterMetrics(ObserverContext<MasterCoprocessorEnvironment> ctx,
439        ClusterMetrics metrics) throws IOException {
440      POST_COUNT.incrementAndGet();
441    }
442  }
443}