001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.coprocessor; 021 022import static org.junit.Assert.assertArrayEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Hashtable; 032import java.util.Iterator; 033import java.util.List; 034import java.util.Map; 035import java.util.Random; 036import java.util.Set; 037import javax.management.MBeanAttributeInfo; 038import javax.management.MBeanInfo; 039import javax.management.MBeanServerConnection; 040import javax.management.ObjectInstance; 041import javax.management.ObjectName; 042import javax.management.remote.JMXConnector; 043import javax.management.remote.JMXConnectorFactory; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.HBaseClassTestRule; 046import org.apache.hadoop.hbase.HBaseTestingUtility; 047import org.apache.hadoop.hbase.JMXListener; 048import org.apache.hadoop.hbase.TableName; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Get; 052import org.apache.hadoop.hbase.client.Put; 053import org.apache.hadoop.hbase.client.Table; 054import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 055import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.hamcrest.CustomTypeSafeMatcher; 059import org.hamcrest.Matcher; 060import org.hamcrest.core.AllOf; 061import org.junit.AfterClass; 062import org.junit.BeforeClass; 063import org.junit.ClassRule; 064import org.junit.Test; 065import org.junit.experimental.categories.Category; 066import org.slf4j.Logger; 067import org.slf4j.LoggerFactory; 068 069@Category({ CoprocessorTests.class, LargeTests.class }) 070public class TestMetaTableMetrics { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestMetaTableMetrics.class); 075 private static final Logger LOG = LoggerFactory.getLogger(TestMetaTableMetrics.class); 076 077 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 078 private static final TableName NAME1 = TableName.valueOf("TestExampleMetaTableMetricsOne"); 079 private static final byte[] FAMILY = Bytes.toBytes("f"); 080 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 081 private static final ColumnFamilyDescriptor CFD = 082 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build(); 083 private static final int NUM_ROWS = 5; 084 private static final String value = "foo"; 085 private static final String METRICS_ATTRIBUTE_NAME_PREFIX = "MetaTable_"; 086 private static final List<String> METRICS_ATTRIBUTE_NAME_POSTFIXES = 087 Arrays.asList("_count", "_mean_rate", "_1min_rate", "_5min_rate", "_15min_rate"); 088 private static int connectorPort = 61120; 089 090 private final byte[] cf = Bytes.toBytes("info"); 091 private final byte[] col = Bytes.toBytes("any"); 092 private byte[] tablename; 093 private final int nthreads = 20; 094 095 @BeforeClass 096 public static void setupBeforeClass() throws Exception { 097 Configuration conf = UTIL.getConfiguration(); 098 // Set system coprocessor so it can be applied to meta regions 099 UTIL.getConfiguration().set("hbase.coprocessor.region.classes", 100 MetaTableMetrics.class.getName()); 101 conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, JMXListener.class.getName()); 102 Random rand = new Random(); 103 for (int i = 0; i < 10; i++) { 104 do { 105 int sign = i % 2 == 0 ? 1 : -1; 106 connectorPort += sign * rand.nextInt(100); 107 } while (!HBaseTestingUtility.available(connectorPort)); 108 try { 109 conf.setInt("regionserver.rmi.registry.port", connectorPort); 110 UTIL.startMiniCluster(1); 111 break; 112 } catch (Exception e) { 113 LOG.debug("Encountered exception when starting cluster. Trying port {}", connectorPort, e); 114 try { 115 // this is to avoid "IllegalStateException: A mini-cluster is already running" 116 UTIL.shutdownMiniCluster(); 117 } catch (Exception ex) { 118 LOG.debug("Encountered exception shutting down cluster", ex); 119 } 120 } 121 } 122 } 123 124 @AfterClass 125 public static void tearDown() throws Exception { 126 UTIL.shutdownMiniCluster(); 127 } 128 129 // Verifies that meta table metrics exist in jmx. In case of one table (one region) with a single 130 // client: 9 metrics 131 // are generated and for each metrics, there should be 5 JMX attributes produced. e.g. for one 132 // table, there should 133 // be 5 MetaTable_table_<TableName>_request attributes, such as: 134 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_count 135 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_mean_rate 136 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_1min_rate 137 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_5min_rate 138 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_15min_rate 139 @Test 140 public void testMetaTableMetricsInJmx() throws Exception { 141 UTIL.getAdmin() 142 .createTable(TableDescriptorBuilder.newBuilder(NAME1).setColumnFamily(CFD).build()); 143 assertTrue(UTIL.getAdmin().isTableEnabled(NAME1)); 144 readWriteData(NAME1); 145 UTIL.deleteTable(NAME1); 146 147 UTIL.waitFor(30000, 2000, true, () -> { 148 Map<String, Double> jmxMetrics = readMetaTableJmxMetrics(); 149 boolean allMetricsFound = AllOf.allOf( 150 containsPositiveJmxAttributesFor("MetaTable_get_request"), 151 containsPositiveJmxAttributesFor("MetaTable_put_request"), 152 containsPositiveJmxAttributesFor("MetaTable_delete_request"), 153 containsPositiveJmxAttributesFor("MetaTable_region_.+_lossy_request"), 154 containsPositiveJmxAttributesFor("MetaTable_table_" + NAME1 + "_request"), 155 containsPositiveJmxAttributesFor("MetaTable_client_.+_put_request"), 156 containsPositiveJmxAttributesFor("MetaTable_client_.+_get_request"), 157 containsPositiveJmxAttributesFor("MetaTable_client_.+_delete_request"), 158 containsPositiveJmxAttributesFor("MetaTable_client_.+_lossy_request") 159 ).matches(jmxMetrics); 160 161 if (allMetricsFound) { 162 LOG.info("all the meta table metrics found with positive values: {}", jmxMetrics); 163 } else { 164 LOG.warn("couldn't find all the meta table metrics with positive values: {}", jmxMetrics); 165 } 166 return allMetricsFound; 167 }); 168 } 169 170 @Test 171 public void testConcurrentAccess() { 172 try { 173 tablename = Bytes.toBytes("hbase:meta"); 174 int numRows = 3000; 175 int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename)); 176 putData(numRows); 177 Thread.sleep(2000); 178 int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename)); 179 assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows); 180 getData(numRows); 181 } catch (InterruptedException e) { 182 LOG.info("Caught InterruptedException while testConcurrentAccess: {}", e.getMessage()); 183 fail(); 184 } catch (IOException e) { 185 LOG.info("Caught IOException while testConcurrentAccess: {}", e.getMessage()); 186 fail(); 187 } 188 } 189 190 private void readWriteData(TableName tableName) throws IOException { 191 try (Table t = UTIL.getConnection().getTable(tableName)) { 192 List<Put> puts = new ArrayList<>(NUM_ROWS); 193 for (int i = 0; i < NUM_ROWS; i++) { 194 Put p = new Put(Bytes.toBytes(i + 1)); 195 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(value)); 196 puts.add(p); 197 } 198 t.put(puts); 199 for (int i = 0; i < NUM_ROWS; i++) { 200 Get get = new Get(Bytes.toBytes(i + 1)); 201 assertArrayEquals(Bytes.toBytes(value), t.get(get).getValue(FAMILY, QUALIFIER)); 202 } 203 } 204 } 205 206 private Matcher<Map<String, Double>> containsPositiveJmxAttributesFor(final String regexp) { 207 return new CustomTypeSafeMatcher<Map<String, Double>>( 208 "failed to find all the 5 positive JMX attributes for: " + regexp) { 209 210 @Override 211 protected boolean matchesSafely(final Map<String, Double> values) { 212 for (String key : values.keySet()) { 213 for (String metricsNamePostfix : METRICS_ATTRIBUTE_NAME_POSTFIXES) { 214 if (key.matches(regexp + metricsNamePostfix) && values.get(key) > 0) { 215 return true; 216 } 217 } 218 } 219 return false; 220 } 221 }; 222 } 223 224 /** 225 * Read the attributes from Hadoop->HBase->RegionServer->MetaTableMetrics in JMX 226 * @throws IOException when fails to retrieve jmx metrics. 227 */ 228 private Map<String, Double> readMetaTableJmxMetrics() throws IOException { 229 JMXConnector connector = null; 230 ObjectName target = null; 231 MBeanServerConnection mb = null; 232 try { 233 connector = 234 JMXConnectorFactory.connect(JMXListener.buildJMXServiceURL(connectorPort, connectorPort)); 235 mb = connector.getMBeanServerConnection(); 236 237 @SuppressWarnings("JdkObsolete") 238 Hashtable<String, String> pairs = new Hashtable<>(); 239 pairs.put("service", "HBase"); 240 pairs.put("name", "RegionServer"); 241 pairs.put("sub", 242 "Coprocessor.Region.CP_org.apache.hadoop.hbase.coprocessor.MetaTableMetrics"); 243 target = new ObjectName("Hadoop", pairs); 244 MBeanInfo beanInfo = mb.getMBeanInfo(target); 245 246 Map<String, Double> existingAttrs = new HashMap<>(); 247 for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) { 248 Object value = mb.getAttribute(target, attrInfo.getName()); 249 if (attrInfo.getName().startsWith(METRICS_ATTRIBUTE_NAME_PREFIX) 250 && value instanceof Number) { 251 existingAttrs.put(attrInfo.getName(), Double.parseDouble(value.toString())); 252 } 253 } 254 LOG.info("MBean Found: {}", target); 255 return existingAttrs; 256 } catch (Exception e) { 257 LOG.warn("Failed to get Meta Table Metrics bean (will retry later): {}", target, e); 258 if (mb != null) { 259 Set<ObjectInstance> instances = mb.queryMBeans(null, null); 260 Iterator<ObjectInstance> iterator = instances.iterator(); 261 LOG.debug("All the MBeans we found:"); 262 while (iterator.hasNext()) { 263 ObjectInstance instance = iterator.next(); 264 LOG.debug("Class and object name: {} [{}]", instance.getClassName(), 265 instance.getObjectName()); 266 } 267 } 268 } finally { 269 if (connector != null) { 270 try { 271 connector.close(); 272 } catch (Exception e) { 273 e.printStackTrace(); 274 } 275 } 276 } 277 return Collections.emptyMap(); 278 } 279 280 private void putData(int nrows) throws InterruptedException { 281 LOG.info("Putting {} rows in hbase:meta", nrows); 282 Thread[] threads = new Thread[nthreads]; 283 for (int i = 1; i <= nthreads; i++) { 284 threads[i - 1] = new PutThread(1, nrows); 285 } 286 startThreadsAndWaitToJoin(threads); 287 } 288 289 private void getData(int nrows) throws InterruptedException { 290 LOG.info("Getting {} rows from hbase:meta", nrows); 291 Thread[] threads = new Thread[nthreads]; 292 for (int i = 1; i <= nthreads; i++) { 293 threads[i - 1] = new GetThread(1, nrows); 294 } 295 startThreadsAndWaitToJoin(threads); 296 } 297 298 private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException { 299 for (int i = 1; i <= nthreads; i++) { 300 threads[i - 1].start(); 301 } 302 for (int i = 1; i <= nthreads; i++) { 303 threads[i - 1].join(); 304 } 305 } 306 307 private class PutThread extends Thread { 308 int start; 309 int end; 310 311 PutThread(int start, int end) { 312 this.start = start; 313 this.end = end; 314 } 315 316 @Override 317 public void run() { 318 try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) { 319 for (int i = start; i <= end; i++) { 320 Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); 321 p.addColumn(cf, col, Bytes.toBytes("Value" + i)); 322 table.put(p); 323 } 324 } catch (IOException e) { 325 LOG.warn("Caught IOException while PutThread operation", e); 326 } 327 } 328 } 329 330 private class GetThread extends Thread { 331 int start; 332 int end; 333 334 GetThread(int start, int end) { 335 this.start = start; 336 this.end = end; 337 } 338 339 @Override 340 public void run() { 341 try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) { 342 for (int i = start; i <= end; i++) { 343 Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); 344 table.get(get); 345 } 346 } catch (IOException e) { 347 LOG.warn("Caught IOException while GetThread operation", e); 348 } 349 } 350 } 351}