001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.Hashtable; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.Random; 034import java.util.Set; 035import java.util.concurrent.ThreadLocalRandom; 036import javax.management.MBeanAttributeInfo; 037import javax.management.MBeanInfo; 038import javax.management.MBeanServerConnection; 039import javax.management.ObjectInstance; 040import javax.management.ObjectName; 041import javax.management.remote.JMXConnector; 042import javax.management.remote.JMXConnectorFactory; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.HBaseClassTestRule; 045import org.apache.hadoop.hbase.HBaseTestingUtility; 046import org.apache.hadoop.hbase.JMXListener; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.Get; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.Table; 053import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 054import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.util.Bytes; 057import org.hamcrest.CustomTypeSafeMatcher; 058import org.hamcrest.Matcher; 059import org.hamcrest.core.AllOf; 060import org.junit.AfterClass; 061import org.junit.BeforeClass; 062import org.junit.ClassRule; 063import org.junit.Test; 064import org.junit.experimental.categories.Category; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068@Category({ CoprocessorTests.class, LargeTests.class }) 069public class TestMetaTableMetrics { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestMetaTableMetrics.class); 074 private static final Logger LOG = LoggerFactory.getLogger(TestMetaTableMetrics.class); 075 076 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 077 private static final TableName NAME1 = TableName.valueOf("TestExampleMetaTableMetricsOne"); 078 private static final byte[] FAMILY = Bytes.toBytes("f"); 079 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 080 private static final ColumnFamilyDescriptor CFD = 081 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build(); 082 private static final int NUM_ROWS = 5; 083 private static final String value = "foo"; 084 private static final String METRICS_ATTRIBUTE_NAME_PREFIX = "MetaTable_"; 085 private static final List<String> METRICS_ATTRIBUTE_NAME_POSTFIXES = 086 Arrays.asList("_count", "_mean_rate", "_1min_rate", "_5min_rate", "_15min_rate"); 087 private static int connectorPort = 61120; 088 089 private final byte[] cf = Bytes.toBytes("info"); 090 private final byte[] col = Bytes.toBytes("any"); 091 private byte[] tablename; 092 private final int nthreads = 20; 093 094 @BeforeClass 095 public static void setupBeforeClass() throws Exception { 096 Configuration conf = UTIL.getConfiguration(); 097 // Set system coprocessor so it can be applied to meta regions 098 UTIL.getConfiguration().set("hbase.coprocessor.region.classes", 099 MetaTableMetrics.class.getName()); 100 conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, JMXListener.class.getName()); 101 Random rand = ThreadLocalRandom.current(); 102 for (int i = 0; i < 10; i++) { 103 do { 104 int sign = i % 2 == 0 ? 1 : -1; 105 connectorPort += sign * rand.nextInt(100); 106 } while (!HBaseTestingUtility.available(connectorPort)); 107 try { 108 conf.setInt("regionserver.rmi.registry.port", connectorPort); 109 UTIL.startMiniCluster(1); 110 break; 111 } catch (Exception e) { 112 LOG.debug("Encountered exception when starting cluster. Trying port {}", connectorPort, e); 113 try { 114 // this is to avoid "IllegalStateException: A mini-cluster is already running" 115 UTIL.shutdownMiniCluster(); 116 } catch (Exception ex) { 117 LOG.debug("Encountered exception shutting down cluster", ex); 118 } 119 } 120 } 121 } 122 123 @AfterClass 124 public static void tearDown() throws Exception { 125 UTIL.shutdownMiniCluster(); 126 } 127 128 // Verifies that meta table metrics exist in jmx. In case of one table (one region) with a single 129 // client: 9 metrics 130 // are generated and for each metrics, there should be 5 JMX attributes produced. e.g. for one 131 // table, there should 132 // be 5 MetaTable_table_<TableName>_request attributes, such as: 133 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_count 134 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_mean_rate 135 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_1min_rate 136 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_5min_rate 137 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_15min_rate 138 @Test 139 public void testMetaTableMetricsInJmx() throws Exception { 140 UTIL.getAdmin() 141 .createTable(TableDescriptorBuilder.newBuilder(NAME1).setColumnFamily(CFD).build()); 142 assertTrue(UTIL.getAdmin().isTableEnabled(NAME1)); 143 readWriteData(NAME1); 144 UTIL.deleteTable(NAME1); 145 146 UTIL.waitFor(30000, 2000, true, () -> { 147 Map<String, Double> jmxMetrics = readMetaTableJmxMetrics(); 148 boolean allMetricsFound = AllOf 149 .allOf(containsPositiveJmxAttributesFor("MetaTable_get_request"), 150 containsPositiveJmxAttributesFor("MetaTable_put_request"), 151 containsPositiveJmxAttributesFor("MetaTable_delete_request"), 152 containsPositiveJmxAttributesFor("MetaTable_region_.+_lossy_request"), 153 containsPositiveJmxAttributesFor("MetaTable_table_" + NAME1 + "_request"), 154 containsPositiveJmxAttributesFor("MetaTable_client_.+_put_request"), 155 containsPositiveJmxAttributesFor("MetaTable_client_.+_get_request"), 156 containsPositiveJmxAttributesFor("MetaTable_client_.+_delete_request"), 157 containsPositiveJmxAttributesFor("MetaTable_client_.+_lossy_request")) 158 .matches(jmxMetrics); 159 160 if (allMetricsFound) { 161 LOG.info("all the meta table metrics found with positive values: {}", jmxMetrics); 162 } else { 163 LOG.warn("couldn't find all the meta table metrics with positive values: {}", jmxMetrics); 164 } 165 return allMetricsFound; 166 }); 167 } 168 169 @Test 170 public void testConcurrentAccess() { 171 try { 172 tablename = Bytes.toBytes("hbase:meta"); 173 int numRows = 3000; 174 int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename)); 175 putData(numRows); 176 Thread.sleep(2000); 177 int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename)); 178 assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows); 179 getData(numRows); 180 } catch (InterruptedException e) { 181 LOG.info("Caught InterruptedException while testConcurrentAccess: {}", e.getMessage()); 182 fail(); 183 } catch (IOException e) { 184 LOG.info("Caught IOException while testConcurrentAccess: {}", e.getMessage()); 185 fail(); 186 } 187 } 188 189 private void readWriteData(TableName tableName) throws IOException { 190 try (Table t = UTIL.getConnection().getTable(tableName)) { 191 List<Put> puts = new ArrayList<>(NUM_ROWS); 192 for (int i = 0; i < NUM_ROWS; i++) { 193 Put p = new Put(Bytes.toBytes(i + 1)); 194 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(value)); 195 puts.add(p); 196 } 197 t.put(puts); 198 for (int i = 0; i < NUM_ROWS; i++) { 199 Get get = new Get(Bytes.toBytes(i + 1)); 200 assertArrayEquals(Bytes.toBytes(value), t.get(get).getValue(FAMILY, QUALIFIER)); 201 } 202 } 203 } 204 205 private Matcher<Map<String, Double>> containsPositiveJmxAttributesFor(final String regexp) { 206 return new CustomTypeSafeMatcher<Map<String, Double>>( 207 "failed to find all the 5 positive JMX attributes for: " + regexp) { 208 209 @Override 210 protected boolean matchesSafely(final Map<String, Double> values) { 211 for (String key : values.keySet()) { 212 for (String metricsNamePostfix : METRICS_ATTRIBUTE_NAME_POSTFIXES) { 213 if (key.matches(regexp + metricsNamePostfix) && values.get(key) > 0) { 214 return true; 215 } 216 } 217 } 218 return false; 219 } 220 }; 221 } 222 223 /** 224 * Read the attributes from Hadoop->HBase->RegionServer->MetaTableMetrics in JMX 225 * @throws IOException when fails to retrieve jmx metrics. 226 */ 227 private Map<String, Double> readMetaTableJmxMetrics() throws IOException { 228 JMXConnector connector = null; 229 ObjectName target = null; 230 MBeanServerConnection mb = null; 231 try { 232 connector = 233 JMXConnectorFactory.connect(JMXListener.buildJMXServiceURL(connectorPort, connectorPort)); 234 mb = connector.getMBeanServerConnection(); 235 236 @SuppressWarnings("JdkObsolete") 237 Hashtable<String, String> pairs = new Hashtable<>(); 238 pairs.put("service", "HBase"); 239 pairs.put("name", "RegionServer"); 240 pairs.put("sub", 241 "Coprocessor.Region.CP_org.apache.hadoop.hbase.coprocessor.MetaTableMetrics"); 242 target = new ObjectName("Hadoop", pairs); 243 MBeanInfo beanInfo = mb.getMBeanInfo(target); 244 245 Map<String, Double> existingAttrs = new HashMap<>(); 246 for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) { 247 Object value = mb.getAttribute(target, attrInfo.getName()); 248 if ( 249 attrInfo.getName().startsWith(METRICS_ATTRIBUTE_NAME_PREFIX) && value instanceof Number 250 ) { 251 existingAttrs.put(attrInfo.getName(), Double.parseDouble(value.toString())); 252 } 253 } 254 LOG.info("MBean Found: {}", target); 255 return existingAttrs; 256 } catch (Exception e) { 257 LOG.warn("Failed to get Meta Table Metrics bean (will retry later): {}", target, e); 258 if (mb != null) { 259 Set<ObjectInstance> instances = mb.queryMBeans(null, null); 260 Iterator<ObjectInstance> iterator = instances.iterator(); 261 LOG.debug("All the MBeans we found:"); 262 while (iterator.hasNext()) { 263 ObjectInstance instance = iterator.next(); 264 LOG.debug("Class and object name: {} [{}]", instance.getClassName(), 265 instance.getObjectName()); 266 } 267 } 268 } finally { 269 if (connector != null) { 270 try { 271 connector.close(); 272 } catch (Exception e) { 273 e.printStackTrace(); 274 } 275 } 276 } 277 return Collections.emptyMap(); 278 } 279 280 private void putData(int nrows) throws InterruptedException { 281 LOG.info("Putting {} rows in hbase:meta", nrows); 282 Thread[] threads = new Thread[nthreads]; 283 for (int i = 1; i <= nthreads; i++) { 284 threads[i - 1] = new PutThread(1, nrows); 285 } 286 startThreadsAndWaitToJoin(threads); 287 } 288 289 private void getData(int nrows) throws InterruptedException { 290 LOG.info("Getting {} rows from hbase:meta", nrows); 291 Thread[] threads = new Thread[nthreads]; 292 for (int i = 1; i <= nthreads; i++) { 293 threads[i - 1] = new GetThread(1, nrows); 294 } 295 startThreadsAndWaitToJoin(threads); 296 } 297 298 private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException { 299 for (int i = 1; i <= nthreads; i++) { 300 threads[i - 1].start(); 301 } 302 for (int i = 1; i <= nthreads; i++) { 303 threads[i - 1].join(); 304 } 305 } 306 307 private class PutThread extends Thread { 308 int start; 309 int end; 310 311 PutThread(int start, int end) { 312 this.start = start; 313 this.end = end; 314 } 315 316 @Override 317 public void run() { 318 try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) { 319 for (int i = start; i <= end; i++) { 320 Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); 321 p.addColumn(cf, col, Bytes.toBytes("Value" + i)); 322 table.put(p); 323 } 324 } catch (IOException e) { 325 LOG.warn("Caught IOException while PutThread operation", e); 326 } 327 } 328 } 329 330 private class GetThread extends Thread { 331 int start; 332 int end; 333 334 GetThread(int start, int end) { 335 this.start = start; 336 this.end = end; 337 } 338 339 @Override 340 public void run() { 341 try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) { 342 for (int i = start; i <= end; i++) { 343 Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); 344 table.get(get); 345 } 346 } catch (IOException e) { 347 LOG.warn("Caught IOException while GetThread operation", e); 348 } 349 } 350 } 351}