001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.coprocessor; 021 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collections; 029import java.util.HashMap; 030import java.util.Hashtable; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Random; 035import java.util.Set; 036 037import javax.management.MBeanAttributeInfo; 038import javax.management.MBeanInfo; 039import javax.management.MBeanServerConnection; 040import javax.management.ObjectInstance; 041import javax.management.ObjectName; 042import javax.management.remote.JMXConnector; 043import javax.management.remote.JMXConnectorFactory; 044 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseTestingUtility; 048import org.apache.hadoop.hbase.JMXListener; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 052import org.apache.hadoop.hbase.client.Get; 053import org.apache.hadoop.hbase.client.Put; 054import org.apache.hadoop.hbase.client.Table; 055import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 056import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 057import org.apache.hadoop.hbase.testclassification.MediumTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.hamcrest.CustomTypeSafeMatcher; 060import org.hamcrest.Matcher; 061import org.hamcrest.core.AllOf; 062import org.junit.AfterClass; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Test; 066import org.junit.experimental.categories.Category; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070@Category({ CoprocessorTests.class, MediumTests.class }) 071public class TestMetaTableMetrics { 072 073 @ClassRule 074 public static final HBaseClassTestRule CLASS_RULE = 075 HBaseClassTestRule.forClass(TestMetaTableMetrics.class); 076 private static final Logger LOG = LoggerFactory.getLogger(TestMetaTableMetrics.class); 077 078 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 079 private static final TableName NAME1 = TableName.valueOf("TestExampleMetaTableMetricsOne"); 080 private static final byte[] FAMILY = Bytes.toBytes("f"); 081 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 082 private static final ColumnFamilyDescriptor CFD = 083 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build(); 084 private static final int NUM_ROWS = 5; 085 private static final String value = "foo"; 086 private static final String METRICS_ATTRIBUTE_NAME_PREFIX = "MetaTable_"; 087 private static final List<String> METRICS_ATTRIBUTE_NAME_POSTFIXES = 088 Arrays.asList("_count", "_mean_rate", "_1min_rate", "_5min_rate", "_15min_rate"); 089 private static int connectorPort = 61120; 090 091 private final byte[] cf = Bytes.toBytes("info"); 092 private final byte[] col = Bytes.toBytes("any"); 093 private byte[] tablename; 094 private final int nthreads = 20; 095 096 @BeforeClass 097 public static void setupBeforeClass() throws Exception { 098 Configuration conf = UTIL.getConfiguration(); 099 // Set system coprocessor so it can be applied to meta regions 100 UTIL.getConfiguration().set("hbase.coprocessor.region.classes", 101 MetaTableMetrics.class.getName()); 102 conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, JMXListener.class.getName()); 103 Random rand = new Random(); 104 for (int i = 0; i < 10; i++) { 105 do { 106 int sign = i % 2 == 0 ? 1 : -1; 107 connectorPort += sign * rand.nextInt(100); 108 } while (!HBaseTestingUtility.available(connectorPort)); 109 try { 110 conf.setInt("regionserver.rmi.registry.port", connectorPort); 111 UTIL.startMiniCluster(1); 112 break; 113 } catch (Exception e) { 114 LOG.debug("Encountered exception when starting cluster. Trying port {}", connectorPort, e); 115 try { 116 // this is to avoid "IllegalStateException: A mini-cluster is already running" 117 UTIL.shutdownMiniCluster(); 118 } catch (Exception ex) { 119 LOG.debug("Encountered exception shutting down cluster", ex); 120 } 121 } 122 } 123 } 124 125 @AfterClass 126 public static void tearDown() throws Exception { 127 UTIL.shutdownMiniCluster(); 128 } 129 130 // Verifies that meta table metrics exist in jmx. In case of one table (one region) with a single 131 // client: 9 metrics 132 // are generated and for each metrics, there should be 5 JMX attributes produced. e.g. for one 133 // table, there should 134 // be 5 MetaTable_table_<TableName>_request attributes, such as: 135 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_count 136 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_mean_rate 137 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_1min_rate 138 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_5min_rate 139 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_15min_rate 140 @Test 141 public void testMetaTableMetricsInJmx() throws Exception { 142 UTIL.getAdmin() 143 .createTable(TableDescriptorBuilder.newBuilder(NAME1).setColumnFamily(CFD).build()); 144 writeData(NAME1); 145 UTIL.deleteTable(NAME1); 146 147 UTIL.waitFor(30000, 2000, true, () -> { 148 Map<String, Double> jmxMetrics = readMetaTableJmxMetrics(); 149 boolean allMetricsFound = AllOf.allOf( 150 containsPositiveJmxAttributesFor("MetaTable_get_request"), 151 containsPositiveJmxAttributesFor("MetaTable_put_request"), 152 containsPositiveJmxAttributesFor("MetaTable_delete_request"), 153 containsPositiveJmxAttributesFor("MetaTable_region_.+_lossy_request"), 154 containsPositiveJmxAttributesFor("MetaTable_table_" + NAME1 + "_request"), 155 containsPositiveJmxAttributesFor("MetaTable_client_.+_put_request"), 156 containsPositiveJmxAttributesFor("MetaTable_client_.+_get_request"), 157 containsPositiveJmxAttributesFor("MetaTable_client_.+_delete_request"), 158 containsPositiveJmxAttributesFor("MetaTable_client_.+_lossy_request") 159 ).matches(jmxMetrics); 160 161 if (allMetricsFound) { 162 LOG.info("all the meta table metrics found with positive values: {}", jmxMetrics); 163 } else { 164 LOG.warn("couldn't find all the meta table metrics with positive values: {}", jmxMetrics); 165 } 166 return allMetricsFound; 167 }); 168 } 169 170 @Test 171 public void testConcurrentAccess() { 172 try { 173 tablename = Bytes.toBytes("hbase:meta"); 174 int numRows = 3000; 175 int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename)); 176 putData(numRows); 177 Thread.sleep(2000); 178 int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename)); 179 assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows); 180 getData(numRows); 181 } catch (InterruptedException e) { 182 LOG.info("Caught InterruptedException while testConcurrentAccess: {}", e.getMessage()); 183 fail(); 184 } catch (IOException e) { 185 LOG.info("Caught IOException while testConcurrentAccess: {}", e.getMessage()); 186 fail(); 187 } 188 } 189 190 private void writeData(TableName tableName) throws IOException { 191 try (Table t = UTIL.getConnection().getTable(tableName)) { 192 List<Put> puts = new ArrayList<>(NUM_ROWS); 193 for (int i = 0; i < NUM_ROWS; i++) { 194 Put p = new Put(Bytes.toBytes(i + 1)); 195 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(value)); 196 puts.add(p); 197 } 198 t.put(puts); 199 } 200 } 201 202 private Matcher<Map<String, Double>> containsPositiveJmxAttributesFor(final String regexp) { 203 return new CustomTypeSafeMatcher<Map<String, Double>>( 204 "failed to find all the 5 positive JMX attributes for: " + regexp) { 205 206 @Override 207 protected boolean matchesSafely(final Map<String, Double> values) { 208 for (String key : values.keySet()) { 209 for (String metricsNamePostfix : METRICS_ATTRIBUTE_NAME_POSTFIXES) { 210 if (key.matches(regexp + metricsNamePostfix) && values.get(key) > 0) { 211 return true; 212 } 213 } 214 } 215 return false; 216 } 217 }; 218 } 219 220 /** 221 * Read the attributes from Hadoop->HBase->RegionServer->MetaTableMetrics in JMX 222 * @throws IOException when fails to retrieve jmx metrics. 223 */ 224 private Map<String, Double> readMetaTableJmxMetrics() throws IOException { 225 JMXConnector connector = null; 226 ObjectName target = null; 227 MBeanServerConnection mb = null; 228 try { 229 connector = 230 JMXConnectorFactory.connect(JMXListener.buildJMXServiceURL(connectorPort, connectorPort)); 231 mb = connector.getMBeanServerConnection(); 232 233 @SuppressWarnings("JdkObsolete") 234 Hashtable<String, String> pairs = new Hashtable<>(); 235 pairs.put("service", "HBase"); 236 pairs.put("name", "RegionServer"); 237 pairs.put("sub", 238 "Coprocessor.Region.CP_org.apache.hadoop.hbase.coprocessor.MetaTableMetrics"); 239 target = new ObjectName("Hadoop", pairs); 240 MBeanInfo beanInfo = mb.getMBeanInfo(target); 241 242 Map<String, Double> existingAttrs = new HashMap<>(); 243 for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) { 244 Object value = mb.getAttribute(target, attrInfo.getName()); 245 if (attrInfo.getName().startsWith(METRICS_ATTRIBUTE_NAME_PREFIX) 246 && value instanceof Number) { 247 existingAttrs.put(attrInfo.getName(), Double.parseDouble(value.toString())); 248 } 249 } 250 LOG.info("MBean Found: {}", target); 251 return existingAttrs; 252 } catch (Exception e) { 253 LOG.warn("Failed to get Meta Table Metrics bean (will retry later): {}", target, e); 254 if (mb != null) { 255 Set<ObjectInstance> instances = mb.queryMBeans(null, null); 256 Iterator<ObjectInstance> iterator = instances.iterator(); 257 LOG.debug("All the MBeans we found:"); 258 while (iterator.hasNext()) { 259 ObjectInstance instance = iterator.next(); 260 LOG.debug("Class and object name: {} [{}]", instance.getClassName(), 261 instance.getObjectName()); 262 } 263 } 264 } finally { 265 if (connector != null) { 266 try { 267 connector.close(); 268 } catch (Exception e) { 269 e.printStackTrace(); 270 } 271 } 272 } 273 return Collections.emptyMap(); 274 } 275 276 private void putData(int nrows) throws InterruptedException { 277 LOG.info("Putting {} rows in hbase:meta", nrows); 278 Thread[] threads = new Thread[nthreads]; 279 for (int i = 1; i <= nthreads; i++) { 280 threads[i - 1] = new PutThread(1, nrows); 281 } 282 startThreadsAndWaitToJoin(threads); 283 } 284 285 private void getData(int nrows) throws InterruptedException { 286 LOG.info("Getting {} rows from hbase:meta", nrows); 287 Thread[] threads = new Thread[nthreads]; 288 for (int i = 1; i <= nthreads; i++) { 289 threads[i - 1] = new GetThread(1, nrows); 290 } 291 startThreadsAndWaitToJoin(threads); 292 } 293 294 private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException { 295 for (int i = 1; i <= nthreads; i++) { 296 threads[i - 1].start(); 297 } 298 for (int i = 1; i <= nthreads; i++) { 299 threads[i - 1].join(); 300 } 301 } 302 303 private class PutThread extends Thread { 304 int start; 305 int end; 306 307 PutThread(int start, int end) { 308 this.start = start; 309 this.end = end; 310 } 311 312 @Override 313 public void run() { 314 try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) { 315 for (int i = start; i <= end; i++) { 316 Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); 317 p.addColumn(cf, col, Bytes.toBytes("Value" + i)); 318 table.put(p); 319 } 320 } catch (IOException e) { 321 LOG.warn("Caught IOException while PutThread operation", e); 322 } 323 } 324 } 325 326 private class GetThread extends Thread { 327 int start; 328 int end; 329 330 GetThread(int start, int end) { 331 this.start = start; 332 this.end = end; 333 } 334 335 @Override 336 public void run() { 337 try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) { 338 for (int i = start; i <= end; i++) { 339 Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); 340 table.get(get); 341 } 342 } catch (IOException e) { 343 LOG.warn("Caught IOException while GetThread operation", e); 344 } 345 } 346 } 347}