001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.coprocessor;
021
022import static org.junit.Assert.assertArrayEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Hashtable;
032import java.util.Iterator;
033import java.util.List;
034import java.util.Map;
035import java.util.Random;
036import java.util.Set;
037import javax.management.MBeanAttributeInfo;
038import javax.management.MBeanInfo;
039import javax.management.MBeanServerConnection;
040import javax.management.ObjectInstance;
041import javax.management.ObjectName;
042import javax.management.remote.JMXConnector;
043import javax.management.remote.JMXConnectorFactory;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseTestingUtility;
047import org.apache.hadoop.hbase.JMXListener;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Get;
052import org.apache.hadoop.hbase.client.Put;
053import org.apache.hadoop.hbase.client.Table;
054import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
055import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.hamcrest.CustomTypeSafeMatcher;
059import org.hamcrest.Matcher;
060import org.hamcrest.core.AllOf;
061import org.junit.AfterClass;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069@Category({ CoprocessorTests.class, LargeTests.class })
070public class TestMetaTableMetrics {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074      HBaseClassTestRule.forClass(TestMetaTableMetrics.class);
075  private static final Logger LOG = LoggerFactory.getLogger(TestMetaTableMetrics.class);
076
077  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
078  private static final TableName NAME1 = TableName.valueOf("TestExampleMetaTableMetricsOne");
079  private static final byte[] FAMILY = Bytes.toBytes("f");
080  private static final byte[] QUALIFIER = Bytes.toBytes("q");
081  private static final ColumnFamilyDescriptor CFD =
082      ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build();
083  private static final int NUM_ROWS = 5;
084  private static final String value = "foo";
085  private static final String METRICS_ATTRIBUTE_NAME_PREFIX = "MetaTable_";
086  private static final List<String> METRICS_ATTRIBUTE_NAME_POSTFIXES =
087      Arrays.asList("_count", "_mean_rate", "_1min_rate", "_5min_rate", "_15min_rate");
088  private static int connectorPort = 61120;
089
090  private final byte[] cf = Bytes.toBytes("info");
091  private final byte[] col = Bytes.toBytes("any");
092  private byte[] tablename;
093  private final int nthreads = 20;
094
095  @BeforeClass
096  public static void setupBeforeClass() throws Exception {
097    Configuration conf = UTIL.getConfiguration();
098    // Set system coprocessor so it can be applied to meta regions
099    UTIL.getConfiguration().set("hbase.coprocessor.region.classes",
100      MetaTableMetrics.class.getName());
101    conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, JMXListener.class.getName());
102    Random rand = new Random();
103    for (int i = 0; i < 10; i++) {
104      do {
105        int sign = i % 2 == 0 ? 1 : -1;
106        connectorPort += sign * rand.nextInt(100);
107      } while (!HBaseTestingUtility.available(connectorPort));
108      try {
109        conf.setInt("regionserver.rmi.registry.port", connectorPort);
110        UTIL.startMiniCluster(1);
111        break;
112      } catch (Exception e) {
113        LOG.debug("Encountered exception when starting cluster. Trying port {}", connectorPort, e);
114        try {
115          // this is to avoid "IllegalStateException: A mini-cluster is already running"
116          UTIL.shutdownMiniCluster();
117        } catch (Exception ex) {
118          LOG.debug("Encountered exception shutting down cluster", ex);
119        }
120      }
121    }
122  }
123
124  @AfterClass
125  public static void tearDown() throws Exception {
126    UTIL.shutdownMiniCluster();
127  }
128
129  // Verifies that meta table metrics exist in jmx. In case of one table (one region) with a single
130  // client: 9 metrics
131  // are generated and for each metrics, there should be 5 JMX attributes produced. e.g. for one
132  // table, there should
133  // be 5 MetaTable_table_<TableName>_request attributes, such as:
134  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_count
135  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_mean_rate
136  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_1min_rate
137  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_5min_rate
138  // - MetaTable_table_TestExampleMetaTableMetricsOne_request_15min_rate
139  @Test
140  public void testMetaTableMetricsInJmx() throws Exception {
141    UTIL.getAdmin()
142        .createTable(TableDescriptorBuilder.newBuilder(NAME1).setColumnFamily(CFD).build());
143    assertTrue(UTIL.getAdmin().isTableEnabled(NAME1));
144    readWriteData(NAME1);
145    UTIL.deleteTable(NAME1);
146
147    UTIL.waitFor(30000, 2000, true, () -> {
148      Map<String, Double> jmxMetrics = readMetaTableJmxMetrics();
149      boolean allMetricsFound = AllOf.allOf(
150        containsPositiveJmxAttributesFor("MetaTable_get_request"),
151        containsPositiveJmxAttributesFor("MetaTable_put_request"),
152        containsPositiveJmxAttributesFor("MetaTable_delete_request"),
153        containsPositiveJmxAttributesFor("MetaTable_region_.+_lossy_request"),
154        containsPositiveJmxAttributesFor("MetaTable_table_" + NAME1 + "_request"),
155        containsPositiveJmxAttributesFor("MetaTable_client_.+_put_request"),
156        containsPositiveJmxAttributesFor("MetaTable_client_.+_get_request"),
157        containsPositiveJmxAttributesFor("MetaTable_client_.+_delete_request"),
158        containsPositiveJmxAttributesFor("MetaTable_client_.+_lossy_request")
159      ).matches(jmxMetrics);
160
161      if (allMetricsFound) {
162        LOG.info("all the meta table metrics found with positive values: {}", jmxMetrics);
163      } else {
164        LOG.warn("couldn't find all the meta table metrics with positive values: {}", jmxMetrics);
165      }
166      return allMetricsFound;
167    });
168  }
169
170  @Test
171  public void testConcurrentAccess() {
172    try {
173      tablename = Bytes.toBytes("hbase:meta");
174      int numRows = 3000;
175      int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename));
176      putData(numRows);
177      Thread.sleep(2000);
178      int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename));
179      assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows);
180      getData(numRows);
181    } catch (InterruptedException e) {
182      LOG.info("Caught InterruptedException while testConcurrentAccess: {}", e.getMessage());
183      fail();
184    } catch (IOException e) {
185      LOG.info("Caught IOException while testConcurrentAccess: {}", e.getMessage());
186      fail();
187    }
188  }
189
190  private void readWriteData(TableName tableName) throws IOException {
191    try (Table t = UTIL.getConnection().getTable(tableName)) {
192      List<Put> puts = new ArrayList<>(NUM_ROWS);
193      for (int i = 0; i < NUM_ROWS; i++) {
194        Put p = new Put(Bytes.toBytes(i + 1));
195        p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(value));
196        puts.add(p);
197      }
198      t.put(puts);
199      for (int i = 0; i < NUM_ROWS; i++) {
200        Get get = new Get(Bytes.toBytes(i + 1));
201        assertArrayEquals(Bytes.toBytes(value), t.get(get).getValue(FAMILY, QUALIFIER));
202      }
203    }
204  }
205
206  private Matcher<Map<String, Double>> containsPositiveJmxAttributesFor(final String regexp) {
207    return new CustomTypeSafeMatcher<Map<String, Double>>(
208        "failed to find all the 5 positive JMX attributes for: " + regexp) {
209
210      @Override
211      protected boolean matchesSafely(final Map<String, Double> values) {
212        for (String key : values.keySet()) {
213          for (String metricsNamePostfix : METRICS_ATTRIBUTE_NAME_POSTFIXES) {
214            if (key.matches(regexp + metricsNamePostfix) && values.get(key) > 0) {
215              return true;
216            }
217          }
218        }
219        return false;
220      }
221    };
222  }
223
224  /**
225   * Read the attributes from Hadoop->HBase->RegionServer->MetaTableMetrics in JMX
226   * @throws IOException when fails to retrieve jmx metrics.
227   */
228  private Map<String, Double> readMetaTableJmxMetrics() throws IOException {
229    JMXConnector connector = null;
230    ObjectName target = null;
231    MBeanServerConnection mb = null;
232    try {
233      connector =
234          JMXConnectorFactory.connect(JMXListener.buildJMXServiceURL(connectorPort, connectorPort));
235      mb = connector.getMBeanServerConnection();
236
237      @SuppressWarnings("JdkObsolete")
238      Hashtable<String, String> pairs = new Hashtable<>();
239      pairs.put("service", "HBase");
240      pairs.put("name", "RegionServer");
241      pairs.put("sub",
242        "Coprocessor.Region.CP_org.apache.hadoop.hbase.coprocessor.MetaTableMetrics");
243      target = new ObjectName("Hadoop", pairs);
244      MBeanInfo beanInfo = mb.getMBeanInfo(target);
245
246      Map<String, Double> existingAttrs = new HashMap<>();
247      for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) {
248        Object value = mb.getAttribute(target, attrInfo.getName());
249        if (attrInfo.getName().startsWith(METRICS_ATTRIBUTE_NAME_PREFIX)
250            && value instanceof Number) {
251          existingAttrs.put(attrInfo.getName(), Double.parseDouble(value.toString()));
252        }
253      }
254      LOG.info("MBean Found: {}", target);
255      return existingAttrs;
256    } catch (Exception e) {
257      LOG.warn("Failed to get Meta Table Metrics bean (will retry later): {}", target, e);
258      if (mb != null) {
259        Set<ObjectInstance> instances = mb.queryMBeans(null, null);
260        Iterator<ObjectInstance> iterator = instances.iterator();
261        LOG.debug("All the MBeans we found:");
262        while (iterator.hasNext()) {
263          ObjectInstance instance = iterator.next();
264          LOG.debug("Class and object name: {} [{}]", instance.getClassName(),
265                    instance.getObjectName());
266        }
267      }
268    } finally {
269      if (connector != null) {
270        try {
271          connector.close();
272        } catch (Exception e) {
273          e.printStackTrace();
274        }
275      }
276    }
277    return Collections.emptyMap();
278  }
279
280  private void putData(int nrows) throws InterruptedException {
281    LOG.info("Putting {} rows in hbase:meta", nrows);
282    Thread[] threads = new Thread[nthreads];
283    for (int i = 1; i <= nthreads; i++) {
284      threads[i - 1] = new PutThread(1, nrows);
285    }
286    startThreadsAndWaitToJoin(threads);
287  }
288
289  private void getData(int nrows) throws InterruptedException {
290    LOG.info("Getting {} rows from hbase:meta", nrows);
291    Thread[] threads = new Thread[nthreads];
292    for (int i = 1; i <= nthreads; i++) {
293      threads[i - 1] = new GetThread(1, nrows);
294    }
295    startThreadsAndWaitToJoin(threads);
296  }
297
298  private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException {
299    for (int i = 1; i <= nthreads; i++) {
300      threads[i - 1].start();
301    }
302    for (int i = 1; i <= nthreads; i++) {
303      threads[i - 1].join();
304    }
305  }
306
307  private class PutThread extends Thread {
308    int start;
309    int end;
310
311    PutThread(int start, int end) {
312      this.start = start;
313      this.end = end;
314    }
315
316    @Override
317    public void run() {
318      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
319        for (int i = start; i <= end; i++) {
320          Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
321          p.addColumn(cf, col, Bytes.toBytes("Value" + i));
322          table.put(p);
323        }
324      } catch (IOException e) {
325        LOG.warn("Caught IOException while PutThread operation", e);
326      }
327    }
328  }
329
330  private class GetThread extends Thread {
331    int start;
332    int end;
333
334    GetThread(int start, int end) {
335      this.start = start;
336      this.end = end;
337    }
338
339    @Override
340    public void run() {
341      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
342        for (int i = start; i <= end; i++) {
343          Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
344          table.get(get);
345        }
346      } catch (IOException e) {
347        LOG.warn("Caught IOException while GetThread operation", e);
348      }
349    }
350  }
351}