001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertTrue; 021import java.io.IOException; 022import java.util.concurrent.CyclicBarrier; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.stream.IntStream; 026import java.util.stream.Stream; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.CompatibilityFactory; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.test.MetricsAssertHelper; 031import org.apache.hadoop.hbase.testclassification.MediumTests; 032import org.apache.hadoop.hbase.testclassification.RegionServerTests; 033import org.junit.Before; 034import org.junit.BeforeClass; 035import org.junit.ClassRule; 036import org.junit.Test; 037import org.junit.experimental.categories.Category; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041@Category({ RegionServerTests.class, MediumTests.class }) 042public class TestMetricsTableAggregate { 043 044 @ClassRule 045 public static final HBaseClassTestRule CLASS_RULE = 046 HBaseClassTestRule.forClass(TestMetricsTableAggregate.class); 047 048 private static final Logger LOG = LoggerFactory.getLogger(TestMetricsTableAggregate.class); 049 050 private static MetricsAssertHelper HELPER = 051 CompatibilityFactory.getInstance(MetricsAssertHelper.class); 052 053 private String tableName = "testTableMetrics"; 054 private String pre = "Namespace_default_table_" + tableName + "_metric_"; 055 056 private MetricsTableWrapperStub tableWrapper; 057 private MetricsTable mt; 058 private MetricsRegionServerWrapper rsWrapper; 059 private MetricsRegionServer rsm; 060 private MetricsTableAggregateSource agg; 061 062 @BeforeClass 063 public static void classSetUp() { 064 HELPER.init(); 065 } 066 067 @Before 068 public void setUp() { 069 tableWrapper = new MetricsTableWrapperStub(tableName); 070 mt = new MetricsTable(tableWrapper); 071 rsWrapper = new MetricsRegionServerWrapperStub(); 072 Configuration conf = new Configuration(); 073 rsm = new MetricsRegionServer(rsWrapper, conf, mt); 074 agg = mt.getTableSourceAgg(); 075 } 076 077 @Test 078 public void testRequestMetrics() throws IOException { 079 HELPER.assertCounter(pre + "readRequestCount", 10, agg); 080 HELPER.assertCounter(pre + "writeRequestCount", 20, agg); 081 HELPER.assertCounter(pre + "totalRequestCount", 30, agg); 082 } 083 084 @Test 085 public void testRegionAndStoreMetrics() throws IOException { 086 HELPER.assertGauge(pre + "memstoreSize", 1000, agg); 087 HELPER.assertGauge(pre + "storeFileSize", 2000, agg); 088 HELPER.assertGauge(pre + "tableSize", 3000, agg); 089 090 HELPER.assertGauge(pre + "regionCount", 11, agg); 091 HELPER.assertGauge(pre + "storeCount", 22, agg); 092 HELPER.assertGauge(pre + "storeFileCount", 33, agg); 093 HELPER.assertGauge(pre + "maxStoreFileAge", 44, agg); 094 HELPER.assertGauge(pre + "minStoreFileAge", 55, agg); 095 HELPER.assertGauge(pre + "avgStoreFileAge", 66, agg); 096 HELPER.assertGauge(pre + "numReferenceFiles", 77, agg); 097 HELPER.assertGauge(pre + "averageRegionSize", 88, agg); 098 } 099 100 @Test 101 public void testFlush() { 102 rsm.updateFlush(tableName, 1, 2, 3); 103 HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg); 104 HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 1, agg); 105 HELPER.assertCounter(pre + "flushOutputSize_num_ops", 1, agg); 106 HELPER.assertCounter(pre + "flushedMemstoreBytes", 2, agg); 107 HELPER.assertCounter(pre + "flushedOutputBytes", 3, agg); 108 109 rsm.updateFlush(tableName, 10, 20, 30); 110 HELPER.assertCounter(pre + "flushTime_num_ops", 2, agg); 111 HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 2, agg); 112 HELPER.assertCounter(pre + "flushOutputSize_num_ops", 2, agg); 113 HELPER.assertCounter(pre + "flushedMemstoreBytes", 22, agg); 114 HELPER.assertCounter(pre + "flushedOutputBytes", 33, agg); 115 } 116 117 @Test 118 public void testCompaction() { 119 rsm.updateCompaction(tableName, false, 1, 2, 3, 4, 5); 120 HELPER.assertCounter(pre + "compactionTime_num_ops", 1, agg); 121 HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 1, agg); 122 HELPER.assertCounter(pre + "compactionInputSize_num_ops", 1, agg); 123 HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 1, agg); 124 HELPER.assertCounter(pre + "compactedInputBytes", 4, agg); 125 HELPER.assertCounter(pre + "compactedoutputBytes", 5, agg); 126 127 rsm.updateCompaction(tableName, false, 10, 20, 30, 40, 50); 128 HELPER.assertCounter(pre + "compactionTime_num_ops", 2, agg); 129 HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 2, agg); 130 HELPER.assertCounter(pre + "compactionInputSize_num_ops", 2, agg); 131 HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 2, agg); 132 HELPER.assertCounter(pre + "compactedInputBytes", 44, agg); 133 HELPER.assertCounter(pre + "compactedoutputBytes", 55, agg); 134 135 // do major compaction 136 rsm.updateCompaction(tableName, true, 100, 200, 300, 400, 500); 137 138 HELPER.assertCounter(pre + "compactionTime_num_ops", 3, agg); 139 HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 3, agg); 140 HELPER.assertCounter(pre + "compactionInputSize_num_ops", 3, agg); 141 HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 3, agg); 142 HELPER.assertCounter(pre + "compactedInputBytes", 444, agg); 143 HELPER.assertCounter(pre + "compactedoutputBytes", 555, agg); 144 145 HELPER.assertCounter(pre + "majorCompactionTime_num_ops", 1, agg); 146 HELPER.assertCounter(pre + "majorCompactionInputFileCount_num_ops", 1, agg); 147 HELPER.assertCounter(pre + "majorCompactionInputSize_num_ops", 1, agg); 148 HELPER.assertCounter(pre + "majorCompactionOutputFileCount_num_ops", 1, agg); 149 HELPER.assertCounter(pre + "majorCompactedInputBytes", 400, agg); 150 HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg); 151 } 152 153 private void update(AtomicBoolean succ, int round, CyclicBarrier barrier) { 154 try { 155 for (int i = 0; i < round; i++) { 156 String tn = tableName + "-" + i; 157 barrier.await(10, TimeUnit.SECONDS); 158 rsm.updateFlush(tn, 100, 1000, 500); 159 } 160 } catch (Exception e) { 161 LOG.warn("Failed to update metrics", e); 162 succ.set(false); 163 } 164 } 165 166 @Test 167 public void testConcurrentUpdate() throws InterruptedException { 168 int threadNumber = 10; 169 int round = 100; 170 AtomicBoolean succ = new AtomicBoolean(true); 171 CyclicBarrier barrier = new CyclicBarrier(threadNumber); 172 Thread[] threads = IntStream.range(0, threadNumber) 173 .mapToObj(i -> new Thread(() -> update(succ, round, barrier), "Update-Worker-" + i)) 174 .toArray(Thread[]::new); 175 Stream.of(threads).forEach(Thread::start); 176 for (Thread t : threads) { 177 t.join(); 178 } 179 assertTrue(succ.get()); 180 } 181}