001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.concurrent.CyclicBarrier;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.stream.IntStream;
027import java.util.stream.Stream;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.CompatibilityFactory;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.test.MetricsAssertHelper;
032import org.apache.hadoop.hbase.testclassification.MediumTests;
033import org.apache.hadoop.hbase.testclassification.RegionServerTests;
034import org.junit.Before;
035import org.junit.BeforeClass;
036import org.junit.ClassRule;
037import org.junit.Test;
038import org.junit.experimental.categories.Category;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042@Category({ RegionServerTests.class, MediumTests.class })
043public class TestMetricsTableAggregate {
044
045  @ClassRule
046  public static final HBaseClassTestRule CLASS_RULE =
047    HBaseClassTestRule.forClass(TestMetricsTableAggregate.class);
048
049  private static final Logger LOG = LoggerFactory.getLogger(TestMetricsTableAggregate.class);
050
051  private static MetricsAssertHelper HELPER =
052    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
053
054  private String tableName = "testTableMetrics";
055  private String pre = "Namespace_default_table_" + tableName + "_metric_";
056
057  private MetricsTableWrapperStub tableWrapper;
058  private MetricsTable mt;
059  private MetricsRegionServerWrapper rsWrapper;
060  private MetricsRegionServer rsm;
061  private MetricsTableAggregateSource agg;
062
063  @BeforeClass
064  public static void classSetUp() {
065    HELPER.init();
066  }
067
068  @Before
069  public void setUp() {
070    tableWrapper = new MetricsTableWrapperStub(tableName);
071    mt = new MetricsTable(tableWrapper);
072    rsWrapper = new MetricsRegionServerWrapperStub();
073    Configuration conf = new Configuration();
074    rsm = new MetricsRegionServer(rsWrapper, conf, mt);
075    agg = mt.getTableSourceAgg();
076  }
077
078  @Test
079  public void testRequestMetrics() throws IOException {
080    HELPER.assertCounter(pre + "readRequestCount", 10, agg);
081    HELPER.assertCounter(pre + "writeRequestCount", 20, agg);
082    HELPER.assertCounter(pre + "totalRequestCount", 30, agg);
083  }
084
085  @Test
086  public void testRegionAndStoreMetrics() throws IOException {
087    HELPER.assertGauge(pre + "memstoreSize", 1000, agg);
088    HELPER.assertGauge(pre + "storeFileSize", 2000, agg);
089    HELPER.assertGauge(pre + "tableSize", 3000, agg);
090
091    HELPER.assertGauge(pre + "regionCount", 11, agg);
092    HELPER.assertGauge(pre + "storeCount", 22, agg);
093    HELPER.assertGauge(pre + "storeFileCount", 33, agg);
094    HELPER.assertGauge(pre + "maxStoreFileCount", 8, agg);
095    HELPER.assertGauge(pre + "maxStoreFileAge", 44, agg);
096    HELPER.assertGauge(pre + "minStoreFileAge", 55, agg);
097    HELPER.assertGauge(pre + "avgStoreFileAge", 66, agg);
098    HELPER.assertGauge(pre + "numReferenceFiles", 77, agg);
099    HELPER.assertGauge(pre + "averageRegionSize", 88, agg);
100
101    HELPER.assertGauge(pre + "staticIndexSize", 101, agg);
102    HELPER.assertGauge(pre + "staticBloomSize", 111, agg);
103
104    HELPER.assertCounter(pre + "bloomFilterRequestsCount", 222, agg);
105    HELPER.assertCounter(pre + "bloomFilterNegativeResultsCount", 333, agg);
106    HELPER.assertCounter(pre + "bloomFilterEligibleRequestsCount", 444, agg);
107  }
108
109  @Test
110  public void testFlush() {
111    rsm.updateFlush(tableName, 1, 2, 3);
112    HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg);
113    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 1, agg);
114    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 1, agg);
115    HELPER.assertCounter(pre + "flushedMemstoreBytes", 2, agg);
116    HELPER.assertCounter(pre + "flushedOutputBytes", 3, agg);
117
118    rsm.updateFlush(tableName, 10, 20, 30);
119    HELPER.assertCounter(pre + "flushTime_num_ops", 2, agg);
120    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 2, agg);
121    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 2, agg);
122    HELPER.assertCounter(pre + "flushedMemstoreBytes", 22, agg);
123    HELPER.assertCounter(pre + "flushedOutputBytes", 33, agg);
124  }
125
126  @Test
127  public void testCompaction() {
128    rsm.updateCompaction(tableName, false, 1, 2, 3, 4, 5);
129    HELPER.assertCounter(pre + "compactionTime_num_ops", 1, agg);
130    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 1, agg);
131    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 1, agg);
132    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 1, agg);
133    HELPER.assertCounter(pre + "compactedInputBytes", 4, agg);
134    HELPER.assertCounter(pre + "compactedoutputBytes", 5, agg);
135
136    rsm.updateCompaction(tableName, false, 10, 20, 30, 40, 50);
137    HELPER.assertCounter(pre + "compactionTime_num_ops", 2, agg);
138    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 2, agg);
139    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 2, agg);
140    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 2, agg);
141    HELPER.assertCounter(pre + "compactedInputBytes", 44, agg);
142    HELPER.assertCounter(pre + "compactedoutputBytes", 55, agg);
143
144    // do major compaction
145    rsm.updateCompaction(tableName, true, 100, 200, 300, 400, 500);
146
147    HELPER.assertCounter(pre + "compactionTime_num_ops", 3, agg);
148    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 3, agg);
149    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 3, agg);
150    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 3, agg);
151    HELPER.assertCounter(pre + "compactedInputBytes", 444, agg);
152    HELPER.assertCounter(pre + "compactedoutputBytes", 555, agg);
153
154    HELPER.assertCounter(pre + "majorCompactionTime_num_ops", 1, agg);
155    HELPER.assertCounter(pre + "majorCompactionInputFileCount_num_ops", 1, agg);
156    HELPER.assertCounter(pre + "majorCompactionInputSize_num_ops", 1, agg);
157    HELPER.assertCounter(pre + "majorCompactionOutputFileCount_num_ops", 1, agg);
158    HELPER.assertCounter(pre + "majorCompactedInputBytes", 400, agg);
159    HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg);
160  }
161
162  private void update(AtomicBoolean succ, int round, CyclicBarrier barrier) {
163    try {
164      for (int i = 0; i < round; i++) {
165        String tn = tableName + "-" + i;
166        barrier.await(10, TimeUnit.SECONDS);
167        rsm.updateFlush(tn, 100, 1000, 500);
168      }
169    } catch (Exception e) {
170      LOG.warn("Failed to update metrics", e);
171      succ.set(false);
172    }
173  }
174
175  @Test
176  public void testConcurrentUpdate() throws InterruptedException {
177    int threadNumber = 10;
178    int round = 100;
179    AtomicBoolean succ = new AtomicBoolean(true);
180    CyclicBarrier barrier = new CyclicBarrier(threadNumber);
181    Thread[] threads = IntStream.range(0, threadNumber)
182      .mapToObj(i -> new Thread(() -> update(succ, round, barrier), "Update-Worker-" + i))
183      .toArray(Thread[]::new);
184    Stream.of(threads).forEach(Thread::start);
185    for (Thread t : threads) {
186      t.join();
187    }
188    assertTrue(succ.get());
189  }
190}