001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.HashMap; 029import java.util.Hashtable; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.Random; 034import java.util.Set; 035import java.util.concurrent.ThreadLocalRandom; 036import javax.management.MBeanAttributeInfo; 037import javax.management.MBeanInfo; 038import javax.management.MBeanServerConnection; 039import javax.management.ObjectInstance; 040import javax.management.ObjectName; 041import javax.management.remote.JMXConnector; 042import javax.management.remote.JMXConnectorFactory; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.JMXListener; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 049import org.apache.hadoop.hbase.client.Get; 050import org.apache.hadoop.hbase.client.Put; 051import org.apache.hadoop.hbase.client.Table; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.testclassification.CoprocessorTests; 054import org.apache.hadoop.hbase.testclassification.LargeTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.hamcrest.CustomTypeSafeMatcher; 057import org.hamcrest.Matcher; 058import org.hamcrest.core.AllOf; 059import org.junit.jupiter.api.AfterAll; 060import org.junit.jupiter.api.BeforeAll; 061import org.junit.jupiter.api.Tag; 062import org.junit.jupiter.api.Test; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066@Tag(CoprocessorTests.TAG) 067@Tag(LargeTests.TAG) 068public class TestMetaTableMetrics { 069 private static final Logger LOG = LoggerFactory.getLogger(TestMetaTableMetrics.class); 070 071 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 072 private static final TableName NAME1 = TableName.valueOf("TestExampleMetaTableMetricsOne"); 073 private static final byte[] FAMILY = Bytes.toBytes("f"); 074 private static final byte[] QUALIFIER = Bytes.toBytes("q"); 075 private static final ColumnFamilyDescriptor CFD = 076 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build(); 077 private static final int NUM_ROWS = 5; 078 private static final String value = "foo"; 079 private static final String METRICS_ATTRIBUTE_NAME_PREFIX = "MetaTable_"; 080 private static final List<String> METRICS_ATTRIBUTE_NAME_POSTFIXES = 081 Arrays.asList("_count", "_mean_rate", "_1min_rate", "_5min_rate", "_15min_rate"); 082 private static int connectorPort = 61120; 083 084 private final byte[] cf = Bytes.toBytes("info"); 085 private final byte[] col = Bytes.toBytes("any"); 086 private byte[] tablename; 087 private final int nthreads = 20; 088 089 @BeforeAll 090 public static void setupBeforeClass() throws Exception { 091 Configuration conf = UTIL.getConfiguration(); 092 // Set system coprocessor so it can be applied to meta regions 093 UTIL.getConfiguration().set("hbase.coprocessor.region.classes", 094 MetaTableMetrics.class.getName()); 095 conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, JMXListener.class.getName()); 096 Random rand = ThreadLocalRandom.current(); 097 for (int i = 0; i < 10; i++) { 098 do { 099 int sign = i % 2 == 0 ? 1 : -1; 100 connectorPort += sign * rand.nextInt(100); 101 } while (!HBaseTestingUtil.available(connectorPort)); 102 try { 103 conf.setInt("regionserver.rmi.registry.port", connectorPort); 104 UTIL.startMiniCluster(1); 105 break; 106 } catch (Exception e) { 107 LOG.debug("Encountered exception when starting cluster. Trying port {}", connectorPort, e); 108 try { 109 // this is to avoid "IllegalStateException: A mini-cluster is already running" 110 UTIL.shutdownMiniCluster(); 111 } catch (Exception ex) { 112 LOG.debug("Encountered exception shutting down cluster", ex); 113 } 114 } 115 } 116 } 117 118 @AfterAll 119 public static void tearDown() throws Exception { 120 UTIL.shutdownMiniCluster(); 121 } 122 123 // Verifies that meta table metrics exist in jmx. In case of one table (one region) with a single 124 // client: 9 metrics 125 // are generated and for each metrics, there should be 5 JMX attributes produced. e.g. for one 126 // table, there should 127 // be 5 MetaTable_table_<TableName>_request attributes, such as: 128 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_count 129 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_mean_rate 130 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_1min_rate 131 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_5min_rate 132 // - MetaTable_table_TestExampleMetaTableMetricsOne_request_15min_rate 133 @Test 134 public void testMetaTableMetricsInJmx() throws Exception { 135 UTIL.getAdmin() 136 .createTable(TableDescriptorBuilder.newBuilder(NAME1).setColumnFamily(CFD).build()); 137 assertTrue(UTIL.getAdmin().isTableEnabled(NAME1)); 138 readWriteData(NAME1); 139 UTIL.deleteTable(NAME1); 140 141 UTIL.waitFor(30000, 2000, true, () -> { 142 Map<String, Double> jmxMetrics = readMetaTableJmxMetrics(); 143 boolean allMetricsFound = AllOf 144 .allOf(containsPositiveJmxAttributesFor("MetaTable_get_request"), 145 containsPositiveJmxAttributesFor("MetaTable_put_request"), 146 containsPositiveJmxAttributesFor("MetaTable_delete_request"), 147 containsPositiveJmxAttributesFor("MetaTable_region_.+_lossy_request"), 148 containsPositiveJmxAttributesFor("MetaTable_table_" + NAME1 + "_request"), 149 containsPositiveJmxAttributesFor("MetaTable_client_.+_put_request"), 150 containsPositiveJmxAttributesFor("MetaTable_client_.+_get_request"), 151 containsPositiveJmxAttributesFor("MetaTable_client_.+_delete_request"), 152 containsPositiveJmxAttributesFor("MetaTable_client_.+_lossy_request")) 153 .matches(jmxMetrics); 154 155 if (allMetricsFound) { 156 LOG.info("all the meta table metrics found with positive values: {}", jmxMetrics); 157 } else { 158 LOG.warn("couldn't find all the meta table metrics with positive values: {}", jmxMetrics); 159 } 160 return allMetricsFound; 161 }); 162 } 163 164 @Test 165 public void testConcurrentAccess() { 166 try { 167 tablename = Bytes.toBytes("hbase:meta"); 168 int numRows = 3000; 169 int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename)); 170 putData(numRows); 171 Thread.sleep(2000); 172 int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename)); 173 assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows); 174 getData(numRows); 175 } catch (InterruptedException e) { 176 LOG.info("Caught InterruptedException while testConcurrentAccess: {}", e.getMessage()); 177 fail(); 178 } catch (IOException e) { 179 LOG.info("Caught IOException while testConcurrentAccess: {}", e.getMessage()); 180 fail(); 181 } 182 } 183 184 private void readWriteData(TableName tableName) throws IOException { 185 try (Table t = UTIL.getConnection().getTable(tableName)) { 186 List<Put> puts = new ArrayList<>(NUM_ROWS); 187 for (int i = 0; i < NUM_ROWS; i++) { 188 Put p = new Put(Bytes.toBytes(i + 1)); 189 p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(value)); 190 puts.add(p); 191 } 192 t.put(puts); 193 for (int i = 0; i < NUM_ROWS; i++) { 194 Get get = new Get(Bytes.toBytes(i + 1)); 195 assertArrayEquals(Bytes.toBytes(value), t.get(get).getValue(FAMILY, QUALIFIER)); 196 } 197 } 198 } 199 200 private Matcher<Map<String, Double>> containsPositiveJmxAttributesFor(final String regexp) { 201 return new CustomTypeSafeMatcher<Map<String, Double>>( 202 "failed to find all the 5 positive JMX attributes for: " + regexp) { 203 204 @Override 205 protected boolean matchesSafely(final Map<String, Double> values) { 206 for (String key : values.keySet()) { 207 for (String metricsNamePostfix : METRICS_ATTRIBUTE_NAME_POSTFIXES) { 208 if (key.matches(regexp + metricsNamePostfix) && values.get(key) > 0) { 209 return true; 210 } 211 } 212 } 213 return false; 214 } 215 }; 216 } 217 218 /** 219 * Read the attributes from Hadoop->HBase->RegionServer->MetaTableMetrics in JMX 220 * @throws IOException when fails to retrieve jmx metrics. 221 */ 222 private Map<String, Double> readMetaTableJmxMetrics() throws IOException { 223 JMXConnector connector = null; 224 ObjectName target = null; 225 MBeanServerConnection mb = null; 226 try { 227 connector = 228 JMXConnectorFactory.connect(JMXListener.buildJMXServiceURL(connectorPort, connectorPort)); 229 mb = connector.getMBeanServerConnection(); 230 231 @SuppressWarnings("JdkObsolete") 232 Hashtable<String, String> pairs = new Hashtable<>(); 233 pairs.put("service", "HBase"); 234 pairs.put("name", "RegionServer"); 235 pairs.put("sub", 236 "Coprocessor.Region.CP_org.apache.hadoop.hbase.coprocessor.MetaTableMetrics"); 237 target = new ObjectName("Hadoop", pairs); 238 MBeanInfo beanInfo = mb.getMBeanInfo(target); 239 240 Map<String, Double> existingAttrs = new HashMap<>(); 241 for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) { 242 Object value = mb.getAttribute(target, attrInfo.getName()); 243 if ( 244 attrInfo.getName().startsWith(METRICS_ATTRIBUTE_NAME_PREFIX) && value instanceof Number 245 ) { 246 existingAttrs.put(attrInfo.getName(), Double.parseDouble(value.toString())); 247 } 248 } 249 LOG.info("MBean Found: {}", target); 250 return existingAttrs; 251 } catch (Exception e) { 252 LOG.warn("Failed to get Meta Table Metrics bean (will retry later): {}", target, e); 253 if (mb != null) { 254 Set<ObjectInstance> instances = mb.queryMBeans(null, null); 255 Iterator<ObjectInstance> iterator = instances.iterator(); 256 LOG.debug("All the MBeans we found:"); 257 while (iterator.hasNext()) { 258 ObjectInstance instance = iterator.next(); 259 LOG.debug("Class and object name: {} [{}]", instance.getClassName(), 260 instance.getObjectName()); 261 } 262 } 263 } finally { 264 if (connector != null) { 265 try { 266 connector.close(); 267 } catch (Exception e) { 268 e.printStackTrace(); 269 } 270 } 271 } 272 return Collections.emptyMap(); 273 } 274 275 private void putData(int nrows) throws InterruptedException { 276 LOG.info("Putting {} rows in hbase:meta", nrows); 277 Thread[] threads = new Thread[nthreads]; 278 for (int i = 1; i <= nthreads; i++) { 279 threads[i - 1] = new PutThread(1, nrows); 280 } 281 startThreadsAndWaitToJoin(threads); 282 } 283 284 private void getData(int nrows) throws InterruptedException { 285 LOG.info("Getting {} rows from hbase:meta", nrows); 286 Thread[] threads = new Thread[nthreads]; 287 for (int i = 1; i <= nthreads; i++) { 288 threads[i - 1] = new GetThread(1, nrows); 289 } 290 startThreadsAndWaitToJoin(threads); 291 } 292 293 private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException { 294 for (int i = 1; i <= nthreads; i++) { 295 threads[i - 1].start(); 296 } 297 for (int i = 1; i <= nthreads; i++) { 298 threads[i - 1].join(); 299 } 300 } 301 302 private class PutThread extends Thread { 303 int start; 304 int end; 305 306 PutThread(int start, int end) { 307 this.start = start; 308 this.end = end; 309 } 310 311 @Override 312 public void run() { 313 try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) { 314 for (int i = start; i <= end; i++) { 315 Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); 316 p.addColumn(cf, col, Bytes.toBytes("Value" + i)); 317 table.put(p); 318 } 319 } catch (IOException e) { 320 LOG.warn("Caught IOException while PutThread operation", e); 321 } 322 } 323 } 324 325 private class GetThread extends Thread { 326 int start; 327 int end; 328 329 GetThread(int start, int end) { 330 this.start = start; 331 this.end = end; 332 } 333 334 @Override 335 public void run() { 336 try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) { 337 for (int i = start; i <= end; i++) { 338 Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i))); 339 table.get(get); 340 } 341 } catch (IOException e) { 342 LOG.warn("Caught IOException while GetThread operation", e); 343 } 344 } 345 } 346}