001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.HashMap;
029import java.util.Hashtable;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.Random;
034import java.util.Set;
035import java.util.concurrent.ThreadLocalRandom;
036import javax.management.MBeanAttributeInfo;
037import javax.management.MBeanInfo;
038import javax.management.MBeanServerConnection;
039import javax.management.ObjectInstance;
040import javax.management.ObjectName;
041import javax.management.remote.JMXConnector;
042import javax.management.remote.JMXConnectorFactory;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtility;
046import org.apache.hadoop.hbase.JMXListener;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.Table;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.hamcrest.CustomTypeSafeMatcher;
058import org.hamcrest.Matcher;
059import org.hamcrest.core.AllOf;
060import org.junit.AfterClass;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068@Category({ CoprocessorTests.class, LargeTests.class })
069public class TestMetaTableMetrics {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestMetaTableMetrics.class);
074  private static final Logger LOG = LoggerFactory.getLogger(TestMetaTableMetrics.class);
075
076  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
077  private static final TableName NAME1 = TableName.valueOf("TestExampleMetaTableMetricsOne");
078  private static final byte[] FAMILY = Bytes.toBytes("f");
079  private static final byte[] QUALIFIER = Bytes.toBytes("q");
080  private static final ColumnFamilyDescriptor CFD =
081    ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build();
082  private static final int NUM_ROWS = 5;
083  private static final String value = "foo";
084  private static final String METRICS_ATTRIBUTE_NAME_PREFIX = "MetaTable_";
085  private static final List<String> METRICS_ATTRIBUTE_NAME_POSTFIXES =
086    Arrays.asList("_count", "_mean_rate", "_1min_rate", "_5min_rate", "_15min_rate");
087  private static int connectorPort = 61120;
088
089  private final byte[] cf = Bytes.toBytes("info");
090  private final byte[] col = Bytes.toBytes("any");
091  private byte[] tablename;
092  private final int nthreads = 20;
093
094  @BeforeClass
095  public static void setupBeforeClass() throws Exception {
096    Configuration conf = UTIL.getConfiguration();
097    // Set system coprocessor so it can be applied to meta regions
098    UTIL.getConfiguration().set("hbase.coprocessor.region.classes",
099      MetaTableMetrics.class.getName());
100    conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, JMXListener.class.getName());
101    Random rand = ThreadLocalRandom.current();
102    for (int i = 0; i < 10; i++) {
103      do {
104        int sign = i % 2 == 0 ? 1 : -1;
105        connectorPort += sign * rand.nextInt(100);
106      } while (!HBaseTestingUtility.available(connectorPort));
107      try {
108        conf.setInt("regionserver.rmi.registry.port", connectorPort);
109        UTIL.startMiniCluster(1);
110        break;
111      } catch (Exception e) {
112        LOG.debug("Encountered exception when starting cluster. Trying port {}", connectorPort, e);
113        try {
114          // this is to avoid "IllegalStateException: A mini-cluster is already running"
115          UTIL.shutdownMiniCluster();
116        } catch (Exception ex) {
117          LOG.debug("Encountered exception shutting down cluster", ex);
118        }
119      }
120    }
121  }
122
123  @AfterClass
124  public static void tearDown() throws Exception {
125    UTIL.shutdownMiniCluster();
126  }
127
128  // Verifies that meta table metrics exist in jmx. In case of one table (one region) with a single
129  // client: 9 metrics
130  // are generated and for each metrics, there should be 5 JMX attributes produced. e.g. for one
131  // table, there should
132  // be 5 MetaTable_table_<TableName>_request attributes, such as:
133  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_count
134  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_mean_rate
135  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_1min_rate
136  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_5min_rate
137  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_15min_rate
138  @Test
139  public void testMetaTableMetricsInJmx() throws Exception {
140    UTIL.getAdmin()
141      .createTable(TableDescriptorBuilder.newBuilder(NAME1).setColumnFamily(CFD).build());
142    assertTrue(UTIL.getAdmin().isTableEnabled(NAME1));
143    readWriteData(NAME1);
144    UTIL.deleteTable(NAME1);
145
146    UTIL.waitFor(30000, 2000, true, () -> {
147      Map<String, Double> jmxMetrics = readMetaTableJmxMetrics();
148      boolean allMetricsFound = AllOf
149        .allOf(containsPositiveJmxAttributesFor("MetaTable_get_request"),
150          containsPositiveJmxAttributesFor("MetaTable_put_request"),
151          containsPositiveJmxAttributesFor("MetaTable_delete_request"),
152          containsPositiveJmxAttributesFor("MetaTable_region_.+_lossy_request"),
153          containsPositiveJmxAttributesFor("MetaTable_table_" + NAME1 + "_request"),
154          containsPositiveJmxAttributesFor("MetaTable_client_.+_put_request"),
155          containsPositiveJmxAttributesFor("MetaTable_client_.+_get_request"),
156          containsPositiveJmxAttributesFor("MetaTable_client_.+_delete_request"),
157          containsPositiveJmxAttributesFor("MetaTable_client_.+_lossy_request"))
158        .matches(jmxMetrics);
159
160      if (allMetricsFound) {
161        LOG.info("all the meta table metrics found with positive values: {}", jmxMetrics);
162      } else {
163        LOG.warn("couldn't find all the meta table metrics with positive values: {}", jmxMetrics);
164      }
165      return allMetricsFound;
166    });
167  }
168
169  @Test
170  public void testConcurrentAccess() {
171    try {
172      tablename = Bytes.toBytes("hbase:meta");
173      int numRows = 3000;
174      int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename));
175      putData(numRows);
176      Thread.sleep(2000);
177      int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename));
178      assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows);
179      getData(numRows);
180    } catch (InterruptedException e) {
181      LOG.info("Caught InterruptedException while testConcurrentAccess: {}", e.getMessage());
182      fail();
183    } catch (IOException e) {
184      LOG.info("Caught IOException while testConcurrentAccess: {}", e.getMessage());
185      fail();
186    }
187  }
188
189  private void readWriteData(TableName tableName) throws IOException {
190    try (Table t = UTIL.getConnection().getTable(tableName)) {
191      List<Put> puts = new ArrayList<>(NUM_ROWS);
192      for (int i = 0; i < NUM_ROWS; i++) {
193        Put p = new Put(Bytes.toBytes(i + 1));
194        p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(value));
195        puts.add(p);
196      }
197      t.put(puts);
198      for (int i = 0; i < NUM_ROWS; i++) {
199        Get get = new Get(Bytes.toBytes(i + 1));
200        assertArrayEquals(Bytes.toBytes(value), t.get(get).getValue(FAMILY, QUALIFIER));
201      }
202    }
203  }
204
205  private Matcher<Map<String, Double>> containsPositiveJmxAttributesFor(final String regexp) {
206    return new CustomTypeSafeMatcher<Map<String, Double>>(
207      "failed to find all the 5 positive JMX attributes for: " + regexp) {
208
209      @Override
210      protected boolean matchesSafely(final Map<String, Double> values) {
211        for (String key : values.keySet()) {
212          for (String metricsNamePostfix : METRICS_ATTRIBUTE_NAME_POSTFIXES) {
213            if (key.matches(regexp + metricsNamePostfix) && values.get(key) > 0) {
214              return true;
215            }
216          }
217        }
218        return false;
219      }
220    };
221  }
222
223  /**
224   * Read the attributes from Hadoop->HBase->RegionServer->MetaTableMetrics in JMX
225   * @throws IOException when fails to retrieve jmx metrics.
226   */
227  private Map<String, Double> readMetaTableJmxMetrics() throws IOException {
228    JMXConnector connector = null;
229    ObjectName target = null;
230    MBeanServerConnection mb = null;
231    try {
232      connector =
233        JMXConnectorFactory.connect(JMXListener.buildJMXServiceURL(connectorPort, connectorPort));
234      mb = connector.getMBeanServerConnection();
235
236      @SuppressWarnings("JdkObsolete")
237      Hashtable<String, String> pairs = new Hashtable<>();
238      pairs.put("service", "HBase");
239      pairs.put("name", "RegionServer");
240      pairs.put("sub",
241        "Coprocessor.Region.CP_org.apache.hadoop.hbase.coprocessor.MetaTableMetrics");
242      target = new ObjectName("Hadoop", pairs);
243      MBeanInfo beanInfo = mb.getMBeanInfo(target);
244
245      Map<String, Double> existingAttrs = new HashMap<>();
246      for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) {
247        Object value = mb.getAttribute(target, attrInfo.getName());
248        if (
249          attrInfo.getName().startsWith(METRICS_ATTRIBUTE_NAME_PREFIX) && value instanceof Number
250        ) {
251          existingAttrs.put(attrInfo.getName(), Double.parseDouble(value.toString()));
252        }
253      }
254      LOG.info("MBean Found: {}", target);
255      return existingAttrs;
256    } catch (Exception e) {
257      LOG.warn("Failed to get Meta Table Metrics bean (will retry later): {}", target, e);
258      if (mb != null) {
259        Set<ObjectInstance> instances = mb.queryMBeans(null, null);
260        Iterator<ObjectInstance> iterator = instances.iterator();
261        LOG.debug("All the MBeans we found:");
262        while (iterator.hasNext()) {
263          ObjectInstance instance = iterator.next();
264          LOG.debug("Class and object name: {} [{}]", instance.getClassName(),
265            instance.getObjectName());
266        }
267      }
268    } finally {
269      if (connector != null) {
270        try {
271          connector.close();
272        } catch (Exception e) {
273          e.printStackTrace();
274        }
275      }
276    }
277    return Collections.emptyMap();
278  }
279
280  private void putData(int nrows) throws InterruptedException {
281    LOG.info("Putting {} rows in hbase:meta", nrows);
282    Thread[] threads = new Thread[nthreads];
283    for (int i = 1; i <= nthreads; i++) {
284      threads[i - 1] = new PutThread(1, nrows);
285    }
286    startThreadsAndWaitToJoin(threads);
287  }
288
289  private void getData(int nrows) throws InterruptedException {
290    LOG.info("Getting {} rows from hbase:meta", nrows);
291    Thread[] threads = new Thread[nthreads];
292    for (int i = 1; i <= nthreads; i++) {
293      threads[i - 1] = new GetThread(1, nrows);
294    }
295    startThreadsAndWaitToJoin(threads);
296  }
297
298  private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException {
299    for (int i = 1; i <= nthreads; i++) {
300      threads[i - 1].start();
301    }
302    for (int i = 1; i <= nthreads; i++) {
303      threads[i - 1].join();
304    }
305  }
306
307  private class PutThread extends Thread {
308    int start;
309    int end;
310
311    PutThread(int start, int end) {
312      this.start = start;
313      this.end = end;
314    }
315
316    @Override
317    public void run() {
318      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
319        for (int i = start; i <= end; i++) {
320          Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
321          p.addColumn(cf, col, Bytes.toBytes("Value" + i));
322          table.put(p);
323        }
324      } catch (IOException e) {
325        LOG.warn("Caught IOException while PutThread operation", e);
326      }
327    }
328  }
329
330  private class GetThread extends Thread {
331    int start;
332    int end;
333
334    GetThread(int start, int end) {
335      this.start = start;
336      this.end = end;
337    }
338
339    @Override
340    public void run() {
341      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
342        for (int i = start; i <= end; i++) {
343          Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
344          table.get(get);
345        }
346      } catch (IOException e) {
347        LOG.warn("Caught IOException while GetThread operation", e);
348      }
349    }
350  }
351}