001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.coprocessor;
021
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.Hashtable;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Random;
035import java.util.Set;
036
037import javax.management.MBeanAttributeInfo;
038import javax.management.MBeanInfo;
039import javax.management.MBeanServerConnection;
040import javax.management.ObjectInstance;
041import javax.management.ObjectName;
042import javax.management.remote.JMXConnector;
043import javax.management.remote.JMXConnectorFactory;
044
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.hbase.HBaseClassTestRule;
047import org.apache.hadoop.hbase.HBaseTestingUtility;
048import org.apache.hadoop.hbase.JMXListener;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
052import org.apache.hadoop.hbase.client.Get;
053import org.apache.hadoop.hbase.client.Put;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
056import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.hamcrest.CustomTypeSafeMatcher;
060import org.hamcrest.Matcher;
061import org.hamcrest.core.AllOf;
062import org.junit.AfterClass;
063import org.junit.BeforeClass;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070@Category({ CoprocessorTests.class, MediumTests.class })
071public class TestMetaTableMetrics {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075      HBaseClassTestRule.forClass(TestMetaTableMetrics.class);
076  private static final Logger LOG = LoggerFactory.getLogger(TestMetaTableMetrics.class);
077
078  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
079  private static final TableName NAME1 = TableName.valueOf("TestExampleMetaTableMetricsOne");
080  private static final byte[] FAMILY = Bytes.toBytes("f");
081  private static final byte[] QUALIFIER = Bytes.toBytes("q");
082  private static final ColumnFamilyDescriptor CFD =
083      ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build();
084  private static final int NUM_ROWS = 5;
085  private static final String value = "foo";
086  private static final String METRICS_ATTRIBUTE_NAME_PREFIX = "MetaTable_";
087  private static final List<String> METRICS_ATTRIBUTE_NAME_POSTFIXES =
088      Arrays.asList("_count", "_mean_rate", "_1min_rate", "_5min_rate", "_15min_rate");
089  private static int connectorPort = 61120;
090
091  private final byte[] cf = Bytes.toBytes("info");
092  private final byte[] col = Bytes.toBytes("any");
093  private byte[] tablename;
094  private final int nthreads = 20;
095
096  @BeforeClass
097  public static void setupBeforeClass() throws Exception {
098    Configuration conf = UTIL.getConfiguration();
099    // Set system coprocessor so it can be applied to meta regions
100    UTIL.getConfiguration().set("hbase.coprocessor.region.classes",
101      MetaTableMetrics.class.getName());
102    conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, JMXListener.class.getName());
103    Random rand = new Random();
104    for (int i = 0; i < 10; i++) {
105      do {
106        int sign = i % 2 == 0 ? 1 : -1;
107        connectorPort += sign * rand.nextInt(100);
108      } while (!HBaseTestingUtility.available(connectorPort));
109      try {
110        conf.setInt("regionserver.rmi.registry.port", connectorPort);
111        UTIL.startMiniCluster(1);
112        break;
113      } catch (Exception e) {
114        LOG.debug("Encountered exception when starting cluster. Trying port {}", connectorPort, e);
115        try {
116          // this is to avoid "IllegalStateException: A mini-cluster is already running"
117          UTIL.shutdownMiniCluster();
118        } catch (Exception ex) {
119          LOG.debug("Encountered exception shutting down cluster", ex);
120        }
121      }
122    }
123  }
124
125  @AfterClass
126  public static void tearDown() throws Exception {
127    UTIL.shutdownMiniCluster();
128  }
129
130  // Verifies that meta table metrics exist in jmx. In case of one table (one region) with a single
131  // client: 9 metrics
132  // are generated and for each metrics, there should be 5 JMX attributes produced. e.g. for one
133  // table, there should
134  // be 5 MetaTable_table_<TableName>_request attributes, such as:
135  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_count
136  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_mean_rate
137  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_1min_rate
138  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_5min_rate
139  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_15min_rate
140  @Test
141  public void testMetaTableMetricsInJmx() throws Exception {
142    UTIL.getAdmin()
143        .createTable(TableDescriptorBuilder.newBuilder(NAME1).setColumnFamily(CFD).build());
144    writeData(NAME1);
145    UTIL.deleteTable(NAME1);
146
147    UTIL.waitFor(30000, 2000, true, () -> {
148      Map<String, Double> jmxMetrics = readMetaTableJmxMetrics();
149      boolean allMetricsFound = AllOf.allOf(
150        containsPositiveJmxAttributesFor("MetaTable_get_request"),
151        containsPositiveJmxAttributesFor("MetaTable_put_request"),
152        containsPositiveJmxAttributesFor("MetaTable_delete_request"),
153        containsPositiveJmxAttributesFor("MetaTable_region_.+_lossy_request"),
154        containsPositiveJmxAttributesFor("MetaTable_table_" + NAME1 + "_request"),
155        containsPositiveJmxAttributesFor("MetaTable_client_.+_put_request"),
156        containsPositiveJmxAttributesFor("MetaTable_client_.+_get_request"),
157        containsPositiveJmxAttributesFor("MetaTable_client_.+_delete_request"),
158        containsPositiveJmxAttributesFor("MetaTable_client_.+_lossy_request")
159      ).matches(jmxMetrics);
160
161      if (allMetricsFound) {
162        LOG.info("all the meta table metrics found with positive values: {}", jmxMetrics);
163      } else {
164        LOG.warn("couldn't find all the meta table metrics with positive values: {}", jmxMetrics);
165      }
166      return allMetricsFound;
167    });
168  }
169
170  @Test
171  public void testConcurrentAccess() {
172    try {
173      tablename = Bytes.toBytes("hbase:meta");
174      int numRows = 3000;
175      int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename));
176      putData(numRows);
177      Thread.sleep(2000);
178      int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename));
179      assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows);
180      getData(numRows);
181    } catch (InterruptedException e) {
182      LOG.info("Caught InterruptedException while testConcurrentAccess: {}", e.getMessage());
183      fail();
184    } catch (IOException e) {
185      LOG.info("Caught IOException while testConcurrentAccess: {}", e.getMessage());
186      fail();
187    }
188  }
189
190  private void writeData(TableName tableName) throws IOException {
191    try (Table t = UTIL.getConnection().getTable(tableName)) {
192      List<Put> puts = new ArrayList<>(NUM_ROWS);
193      for (int i = 0; i < NUM_ROWS; i++) {
194        Put p = new Put(Bytes.toBytes(i + 1));
195        p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(value));
196        puts.add(p);
197      }
198      t.put(puts);
199    }
200  }
201
202  private Matcher<Map<String, Double>> containsPositiveJmxAttributesFor(final String regexp) {
203    return new CustomTypeSafeMatcher<Map<String, Double>>(
204        "failed to find all the 5 positive JMX attributes for: " + regexp) {
205
206      @Override
207      protected boolean matchesSafely(final Map<String, Double> values) {
208        for (String key : values.keySet()) {
209          for (String metricsNamePostfix : METRICS_ATTRIBUTE_NAME_POSTFIXES) {
210            if (key.matches(regexp + metricsNamePostfix) && values.get(key) > 0) {
211              return true;
212            }
213          }
214        }
215        return false;
216      }
217    };
218  }
219
220  /**
221   * Read the attributes from Hadoop->HBase->RegionServer->MetaTableMetrics in JMX
222   * @throws IOException when fails to retrieve jmx metrics.
223   */
224  private Map<String, Double> readMetaTableJmxMetrics() throws IOException {
225    JMXConnector connector = null;
226    ObjectName target = null;
227    MBeanServerConnection mb = null;
228    try {
229      connector =
230          JMXConnectorFactory.connect(JMXListener.buildJMXServiceURL(connectorPort, connectorPort));
231      mb = connector.getMBeanServerConnection();
232
233      @SuppressWarnings("JdkObsolete")
234      Hashtable<String, String> pairs = new Hashtable<>();
235      pairs.put("service", "HBase");
236      pairs.put("name", "RegionServer");
237      pairs.put("sub",
238        "Coprocessor.Region.CP_org.apache.hadoop.hbase.coprocessor.MetaTableMetrics");
239      target = new ObjectName("Hadoop", pairs);
240      MBeanInfo beanInfo = mb.getMBeanInfo(target);
241
242      Map<String, Double> existingAttrs = new HashMap<>();
243      for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) {
244        Object value = mb.getAttribute(target, attrInfo.getName());
245        if (attrInfo.getName().startsWith(METRICS_ATTRIBUTE_NAME_PREFIX)
246            && value instanceof Number) {
247          existingAttrs.put(attrInfo.getName(), Double.parseDouble(value.toString()));
248        }
249      }
250      LOG.info("MBean Found: {}", target);
251      return existingAttrs;
252    } catch (Exception e) {
253      LOG.warn("Failed to get Meta Table Metrics bean (will retry later): {}", target, e);
254      if (mb != null) {
255        Set<ObjectInstance> instances = mb.queryMBeans(null, null);
256        Iterator<ObjectInstance> iterator = instances.iterator();
257        LOG.debug("All the MBeans we found:");
258        while (iterator.hasNext()) {
259          ObjectInstance instance = iterator.next();
260          LOG.debug("Class and object name: {} [{}]", instance.getClassName(),
261                    instance.getObjectName());
262        }
263      }
264    } finally {
265      if (connector != null) {
266        try {
267          connector.close();
268        } catch (Exception e) {
269          e.printStackTrace();
270        }
271      }
272    }
273    return Collections.emptyMap();
274  }
275
276  private void putData(int nrows) throws InterruptedException {
277    LOG.info("Putting {} rows in hbase:meta", nrows);
278    Thread[] threads = new Thread[nthreads];
279    for (int i = 1; i <= nthreads; i++) {
280      threads[i - 1] = new PutThread(1, nrows);
281    }
282    startThreadsAndWaitToJoin(threads);
283  }
284
285  private void getData(int nrows) throws InterruptedException {
286    LOG.info("Getting {} rows from hbase:meta", nrows);
287    Thread[] threads = new Thread[nthreads];
288    for (int i = 1; i <= nthreads; i++) {
289      threads[i - 1] = new GetThread(1, nrows);
290    }
291    startThreadsAndWaitToJoin(threads);
292  }
293
294  private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException {
295    for (int i = 1; i <= nthreads; i++) {
296      threads[i - 1].start();
297    }
298    for (int i = 1; i <= nthreads; i++) {
299      threads[i - 1].join();
300    }
301  }
302
303  private class PutThread extends Thread {
304    int start;
305    int end;
306
307    PutThread(int start, int end) {
308      this.start = start;
309      this.end = end;
310    }
311
312    @Override
313    public void run() {
314      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
315        for (int i = start; i <= end; i++) {
316          Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
317          p.addColumn(cf, col, Bytes.toBytes("Value" + i));
318          table.put(p);
319        }
320      } catch (IOException e) {
321        LOG.warn("Caught IOException while PutThread operation", e);
322      }
323    }
324  }
325
326  private class GetThread extends Thread {
327    int start;
328    int end;
329
330    GetThread(int start, int end) {
331      this.start = start;
332      this.end = end;
333    }
334
335    @Override
336    public void run() {
337      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
338        for (int i = start; i <= end; i++) {
339          Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
340          table.get(get);
341        }
342      } catch (IOException e) {
343        LOG.warn("Caught IOException while GetThread operation", e);
344      }
345    }
346  }
347}