001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertTrue; 021 022import java.io.IOException; 023import java.util.concurrent.CyclicBarrier; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicBoolean; 026import java.util.stream.IntStream; 027import java.util.stream.Stream; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.CompatibilityFactory; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.test.MetricsAssertHelper; 032import org.apache.hadoop.hbase.testclassification.RegionServerTests; 033import org.apache.hadoop.hbase.testclassification.SmallTests; 034import org.junit.Before; 035import org.junit.BeforeClass; 036import org.junit.ClassRule; 037import org.junit.Test; 038import org.junit.experimental.categories.Category; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042@Category({ RegionServerTests.class, SmallTests.class }) 043public class TestMetricsTableAggregate { 044 045 @ClassRule 046 public static final HBaseClassTestRule CLASS_RULE = 047 HBaseClassTestRule.forClass(TestMetricsTableAggregate.class); 048 049 private static final Logger LOG = LoggerFactory.getLogger(TestMetricsTableAggregate.class); 050 051 private static MetricsAssertHelper HELPER = 052 CompatibilityFactory.getInstance(MetricsAssertHelper.class); 053 054 private String tableName = "testTableMetrics"; 055 private String pre = "Namespace_default_table_" + tableName + "_metric_"; 056 057 private MetricsTableWrapperStub tableWrapper; 058 private MetricsTable mt; 059 private MetricsRegionServerWrapper rsWrapper; 060 private MetricsRegionServer rsm; 061 private MetricsTableAggregateSource agg; 062 063 @BeforeClass 064 public static void classSetUp() { 065 HELPER.init(); 066 } 067 068 @Before 069 public void setUp() { 070 tableWrapper = new MetricsTableWrapperStub(tableName); 071 mt = new MetricsTable(tableWrapper); 072 rsWrapper = new MetricsRegionServerWrapperStub(); 073 Configuration conf = new Configuration(); 074 rsm = new MetricsRegionServer(rsWrapper, conf, mt); 075 agg = mt.getTableSourceAgg(); 076 } 077 078 @Test 079 public void testRequestMetrics() throws IOException { 080 HELPER.assertCounter(pre + "readRequestCount", 10, agg); 081 HELPER.assertCounter(pre + "writeRequestCount", 20, agg); 082 HELPER.assertCounter(pre + "totalRequestCount", 30, agg); 083 } 084 085 @Test 086 public void testRegionAndStoreMetrics() throws IOException { 087 HELPER.assertGauge(pre + "memstoreSize", 1000, agg); 088 HELPER.assertGauge(pre + "storeFileSize", 2000, agg); 089 HELPER.assertGauge(pre + "tableSize", 3000, agg); 090 091 HELPER.assertGauge(pre + "regionCount", 11, agg); 092 HELPER.assertGauge(pre + "storeCount", 22, agg); 093 HELPER.assertGauge(pre + "storeFileCount", 33, agg); 094 HELPER.assertGauge(pre + "maxStoreFileAge", 44, agg); 095 HELPER.assertGauge(pre + "minStoreFileAge", 55, agg); 096 HELPER.assertGauge(pre + "avgStoreFileAge", 66, agg); 097 HELPER.assertGauge(pre + "numReferenceFiles", 77, agg); 098 HELPER.assertGauge(pre + "averageRegionSize", 88, agg); 099 } 100 101 @Test 102 public void testFlush() { 103 rsm.updateFlush(tableName, 1, 2, 3); 104 HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg); 105 HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 1, agg); 106 HELPER.assertCounter(pre + "flushOutputSize_num_ops", 1, agg); 107 HELPER.assertCounter(pre + "flushedMemstoreBytes", 2, agg); 108 HELPER.assertCounter(pre + "flushedOutputBytes", 3, agg); 109 110 rsm.updateFlush(tableName, 10, 20, 30); 111 HELPER.assertCounter(pre + "flushTime_num_ops", 2, agg); 112 HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 2, agg); 113 HELPER.assertCounter(pre + "flushOutputSize_num_ops", 2, agg); 114 HELPER.assertCounter(pre + "flushedMemstoreBytes", 22, agg); 115 HELPER.assertCounter(pre + "flushedOutputBytes", 33, agg); 116 } 117 118 @Test 119 public void testCompaction() { 120 rsm.updateCompaction(tableName, false, 1, 2, 3, 4, 5); 121 HELPER.assertCounter(pre + "compactionTime_num_ops", 1, agg); 122 HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 1, agg); 123 HELPER.assertCounter(pre + "compactionInputSize_num_ops", 1, agg); 124 HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 1, agg); 125 HELPER.assertCounter(pre + "compactedInputBytes", 4, agg); 126 HELPER.assertCounter(pre + "compactedoutputBytes", 5, agg); 127 128 rsm.updateCompaction(tableName, false, 10, 20, 30, 40, 50); 129 HELPER.assertCounter(pre + "compactionTime_num_ops", 2, agg); 130 HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 2, agg); 131 HELPER.assertCounter(pre + "compactionInputSize_num_ops", 2, agg); 132 HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 2, agg); 133 HELPER.assertCounter(pre + "compactedInputBytes", 44, agg); 134 HELPER.assertCounter(pre + "compactedoutputBytes", 55, agg); 135 136 // do major compaction 137 rsm.updateCompaction(tableName, true, 100, 200, 300, 400, 500); 138 139 HELPER.assertCounter(pre + "compactionTime_num_ops", 3, agg); 140 HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 3, agg); 141 HELPER.assertCounter(pre + "compactionInputSize_num_ops", 3, agg); 142 HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 3, agg); 143 HELPER.assertCounter(pre + "compactedInputBytes", 444, agg); 144 HELPER.assertCounter(pre + "compactedoutputBytes", 555, agg); 145 146 HELPER.assertCounter(pre + "majorCompactionTime_num_ops", 1, agg); 147 HELPER.assertCounter(pre + "majorCompactionInputFileCount_num_ops", 1, agg); 148 HELPER.assertCounter(pre + "majorCompactionInputSize_num_ops", 1, agg); 149 HELPER.assertCounter(pre + "majorCompactionOutputFileCount_num_ops", 1, agg); 150 HELPER.assertCounter(pre + "majorCompactedInputBytes", 400, agg); 151 HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg); 152 } 153 154 private void update(AtomicBoolean succ, int round, CyclicBarrier barrier) { 155 try { 156 for (int i = 0; i < round; i++) { 157 String tn = tableName + "-" + i; 158 barrier.await(10, TimeUnit.SECONDS); 159 rsm.updateFlush(tn, 100, 1000, 500); 160 } 161 } catch (Exception e) { 162 LOG.warn("Failed to update metrics", e); 163 succ.set(false); 164 } 165 } 166 167 @Test 168 public void testConcurrentUpdate() throws InterruptedException { 169 int threadNumber = 10; 170 int round = 100; 171 AtomicBoolean succ = new AtomicBoolean(true); 172 CyclicBarrier barrier = new CyclicBarrier(threadNumber); 173 Thread[] threads = IntStream.range(0, threadNumber) 174 .mapToObj(i -> new Thread(() -> update(succ, round, barrier), "Update-Worker-" + i)) 175 .toArray(Thread[]::new); 176 Stream.of(threads).forEach(Thread::start); 177 for (Thread t : threads) { 178 t.join(); 179 } 180 assertTrue(succ.get()); 181 } 182}