001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.concurrent.CyclicBarrier;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.stream.IntStream;
027import java.util.stream.Stream;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.CompatibilityFactory;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.test.MetricsAssertHelper;
032import org.apache.hadoop.hbase.testclassification.MediumTests;
033import org.apache.hadoop.hbase.testclassification.RegionServerTests;
034import org.junit.Before;
035import org.junit.BeforeClass;
036import org.junit.ClassRule;
037import org.junit.Test;
038import org.junit.experimental.categories.Category;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042@Category({ RegionServerTests.class, MediumTests.class })
043public class TestMetricsTableAggregate {
044
045  @ClassRule
046  public static final HBaseClassTestRule CLASS_RULE =
047    HBaseClassTestRule.forClass(TestMetricsTableAggregate.class);
048
049  private static final Logger LOG = LoggerFactory.getLogger(TestMetricsTableAggregate.class);
050
051  private static MetricsAssertHelper HELPER =
052    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
053
054  private String tableName = "testTableMetrics";
055  private String pre = "Namespace_default_table_" + tableName + "_metric_";
056
057  private MetricsTableWrapperStub tableWrapper;
058  private MetricsTable mt;
059  private MetricsRegionServerWrapper rsWrapper;
060  private MetricsRegionServer rsm;
061  private MetricsTableAggregateSource agg;
062
063  @BeforeClass
064  public static void classSetUp() {
065    HELPER.init();
066  }
067
068  @Before
069  public void setUp() {
070    tableWrapper = new MetricsTableWrapperStub(tableName);
071    mt = new MetricsTable(tableWrapper);
072    rsWrapper = new MetricsRegionServerWrapperStub();
073    Configuration conf = new Configuration();
074    rsm = new MetricsRegionServer(rsWrapper, conf, mt);
075    agg = mt.getTableSourceAgg();
076  }
077
078  @Test
079  public void testRequestMetrics() throws IOException {
080    HELPER.assertCounter(pre + "readRequestCount", 10, agg);
081    HELPER.assertCounter(pre + "writeRequestCount", 20, agg);
082    HELPER.assertCounter(pre + "totalRequestCount", 30, agg);
083  }
084
085  @Test
086  public void testRegionAndStoreMetrics() throws IOException {
087    HELPER.assertGauge(pre + "memstoreSize", 1000, agg);
088    HELPER.assertGauge(pre + "memstoreHeapSize", 1001, agg);
089    HELPER.assertGauge(pre + "memstoreOffHeapSize", 1002, agg);
090    HELPER.assertGauge(pre + "storeFileSize", 2000, agg);
091    HELPER.assertGauge(pre + "tableSize", 3000, agg);
092
093    HELPER.assertGauge(pre + "regionCount", 11, agg);
094    HELPER.assertGauge(pre + "storeCount", 22, agg);
095    HELPER.assertGauge(pre + "storeFileCount", 33, agg);
096    HELPER.assertGauge(pre + "maxStoreFileCount", 8, agg);
097    HELPER.assertGauge(pre + "maxStoreFileAge", 44, agg);
098    HELPER.assertGauge(pre + "minStoreFileAge", 55, agg);
099    HELPER.assertGauge(pre + "avgStoreFileAge", 66, agg);
100    HELPER.assertGauge(pre + "numReferenceFiles", 77, agg);
101    HELPER.assertGauge(pre + "averageRegionSize", 88, agg);
102
103    HELPER.assertGauge(pre + "staticIndexSize", 101, agg);
104    HELPER.assertGauge(pre + "staticBloomSize", 111, agg);
105
106    HELPER.assertCounter(pre + "bloomFilterRequestsCount", 222, agg);
107    HELPER.assertCounter(pre + "bloomFilterNegativeResultsCount", 333, agg);
108    HELPER.assertCounter(pre + "bloomFilterEligibleRequestsCount", 444, agg);
109  }
110
111  @Test
112  public void testFlush() {
113    rsm.updateFlush(tableName, 1, 2, 3);
114    HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg);
115    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 1, agg);
116    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 1, agg);
117    HELPER.assertCounter(pre + "flushedMemstoreBytes", 2, agg);
118    HELPER.assertCounter(pre + "flushedOutputBytes", 3, agg);
119
120    rsm.updateFlush(tableName, 10, 20, 30);
121    HELPER.assertCounter(pre + "flushTime_num_ops", 2, agg);
122    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 2, agg);
123    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 2, agg);
124    HELPER.assertCounter(pre + "flushedMemstoreBytes", 22, agg);
125    HELPER.assertCounter(pre + "flushedOutputBytes", 33, agg);
126  }
127
128  @Test
129  public void testCompaction() {
130    rsm.updateCompaction(tableName, false, 1, 2, 3, 4, 5);
131    HELPER.assertCounter(pre + "compactionTime_num_ops", 1, agg);
132    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 1, agg);
133    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 1, agg);
134    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 1, agg);
135    HELPER.assertCounter(pre + "compactedInputBytes", 4, agg);
136    HELPER.assertCounter(pre + "compactedoutputBytes", 5, agg);
137
138    rsm.updateCompaction(tableName, false, 10, 20, 30, 40, 50);
139    HELPER.assertCounter(pre + "compactionTime_num_ops", 2, agg);
140    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 2, agg);
141    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 2, agg);
142    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 2, agg);
143    HELPER.assertCounter(pre + "compactedInputBytes", 44, agg);
144    HELPER.assertCounter(pre + "compactedoutputBytes", 55, agg);
145
146    // do major compaction
147    rsm.updateCompaction(tableName, true, 100, 200, 300, 400, 500);
148
149    HELPER.assertCounter(pre + "compactionTime_num_ops", 3, agg);
150    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 3, agg);
151    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 3, agg);
152    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 3, agg);
153    HELPER.assertCounter(pre + "compactedInputBytes", 444, agg);
154    HELPER.assertCounter(pre + "compactedoutputBytes", 555, agg);
155
156    HELPER.assertCounter(pre + "majorCompactionTime_num_ops", 1, agg);
157    HELPER.assertCounter(pre + "majorCompactionInputFileCount_num_ops", 1, agg);
158    HELPER.assertCounter(pre + "majorCompactionInputSize_num_ops", 1, agg);
159    HELPER.assertCounter(pre + "majorCompactionOutputFileCount_num_ops", 1, agg);
160    HELPER.assertCounter(pre + "majorCompactedInputBytes", 400, agg);
161    HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg);
162  }
163
164  private void update(AtomicBoolean succ, int round, CyclicBarrier barrier) {
165    try {
166      for (int i = 0; i < round; i++) {
167        String tn = tableName + "-" + i;
168        barrier.await(10, TimeUnit.SECONDS);
169        rsm.updateFlush(tn, 100, 1000, 500);
170      }
171    } catch (Exception e) {
172      LOG.warn("Failed to update metrics", e);
173      succ.set(false);
174    }
175  }
176
177  @Test
178  public void testConcurrentUpdate() throws InterruptedException {
179    int threadNumber = 10;
180    int round = 100;
181    AtomicBoolean succ = new AtomicBoolean(true);
182    CyclicBarrier barrier = new CyclicBarrier(threadNumber);
183    Thread[] threads = IntStream.range(0, threadNumber)
184      .mapToObj(i -> new Thread(() -> update(succ, round, barrier), "Update-Worker-" + i))
185      .toArray(Thread[]::new);
186    Stream.of(threads).forEach(Thread::start);
187    for (Thread t : threads) {
188      t.join();
189    }
190    assertTrue(succ.get());
191  }
192}