001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.util.concurrent.CyclicBarrier;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.stream.IntStream;
027import java.util.stream.Stream;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.CompatibilityFactory;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.test.MetricsAssertHelper;
032import org.apache.hadoop.hbase.testclassification.RegionServerTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.junit.Before;
035import org.junit.BeforeClass;
036import org.junit.ClassRule;
037import org.junit.Test;
038import org.junit.experimental.categories.Category;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042@Category({ RegionServerTests.class, SmallTests.class })
043public class TestMetricsTableAggregate {
044
045  @ClassRule
046  public static final HBaseClassTestRule CLASS_RULE =
047    HBaseClassTestRule.forClass(TestMetricsTableAggregate.class);
048
049  private static final Logger LOG = LoggerFactory.getLogger(TestMetricsTableAggregate.class);
050
051  private static MetricsAssertHelper HELPER =
052    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
053
054  private String tableName = "testTableMetrics";
055  private String pre = "Namespace_default_table_" + tableName + "_metric_";
056
057  private MetricsTableWrapperStub tableWrapper;
058  private MetricsTable mt;
059  private MetricsRegionServerWrapper rsWrapper;
060  private MetricsRegionServer rsm;
061  private MetricsTableAggregateSource agg;
062
063  @BeforeClass
064  public static void classSetUp() {
065    HELPER.init();
066  }
067
068  @Before
069  public void setUp() {
070    tableWrapper = new MetricsTableWrapperStub(tableName);
071    mt = new MetricsTable(tableWrapper);
072    rsWrapper = new MetricsRegionServerWrapperStub();
073    Configuration conf = new Configuration();
074    rsm = new MetricsRegionServer(rsWrapper, conf, mt);
075    agg = mt.getTableSourceAgg();
076  }
077
078  @Test
079  public void testRequestMetrics() throws IOException {
080    HELPER.assertCounter(pre + "readRequestCount", 10, agg);
081    HELPER.assertCounter(pre + "writeRequestCount", 20, agg);
082    HELPER.assertCounter(pre + "totalRequestCount", 30, agg);
083  }
084
085  @Test
086  public void testRegionAndStoreMetrics() throws IOException {
087    HELPER.assertGauge(pre + "memstoreSize", 1000, agg);
088    HELPER.assertGauge(pre + "storeFileSize", 2000, agg);
089    HELPER.assertGauge(pre + "tableSize", 3000, agg);
090
091    HELPER.assertGauge(pre + "regionCount", 11, agg);
092    HELPER.assertGauge(pre + "storeCount", 22, agg);
093    HELPER.assertGauge(pre + "storeFileCount", 33, agg);
094    HELPER.assertGauge(pre + "maxStoreFileAge", 44, agg);
095    HELPER.assertGauge(pre + "minStoreFileAge", 55, agg);
096    HELPER.assertGauge(pre + "avgStoreFileAge", 66, agg);
097    HELPER.assertGauge(pre + "numReferenceFiles", 77, agg);
098    HELPER.assertGauge(pre + "averageRegionSize", 88, agg);
099  }
100
101  @Test
102  public void testFlush() {
103    rsm.updateFlush(tableName, 1, 2, 3);
104    HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg);
105    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 1, agg);
106    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 1, agg);
107    HELPER.assertCounter(pre + "flushedMemstoreBytes", 2, agg);
108    HELPER.assertCounter(pre + "flushedOutputBytes", 3, agg);
109
110    rsm.updateFlush(tableName, 10, 20, 30);
111    HELPER.assertCounter(pre + "flushTime_num_ops", 2, agg);
112    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 2, agg);
113    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 2, agg);
114    HELPER.assertCounter(pre + "flushedMemstoreBytes", 22, agg);
115    HELPER.assertCounter(pre + "flushedOutputBytes", 33, agg);
116  }
117
118  @Test
119  public void testCompaction() {
120    rsm.updateCompaction(tableName, false, 1, 2, 3, 4, 5);
121    HELPER.assertCounter(pre + "compactionTime_num_ops", 1, agg);
122    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 1, agg);
123    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 1, agg);
124    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 1, agg);
125    HELPER.assertCounter(pre + "compactedInputBytes", 4, agg);
126    HELPER.assertCounter(pre + "compactedoutputBytes", 5, agg);
127
128    rsm.updateCompaction(tableName, false, 10, 20, 30, 40, 50);
129    HELPER.assertCounter(pre + "compactionTime_num_ops", 2, agg);
130    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 2, agg);
131    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 2, agg);
132    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 2, agg);
133    HELPER.assertCounter(pre + "compactedInputBytes", 44, agg);
134    HELPER.assertCounter(pre + "compactedoutputBytes", 55, agg);
135
136    // do major compaction
137    rsm.updateCompaction(tableName, true, 100, 200, 300, 400, 500);
138
139    HELPER.assertCounter(pre + "compactionTime_num_ops", 3, agg);
140    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 3, agg);
141    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 3, agg);
142    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 3, agg);
143    HELPER.assertCounter(pre + "compactedInputBytes", 444, agg);
144    HELPER.assertCounter(pre + "compactedoutputBytes", 555, agg);
145
146    HELPER.assertCounter(pre + "majorCompactionTime_num_ops", 1, agg);
147    HELPER.assertCounter(pre + "majorCompactionInputFileCount_num_ops", 1, agg);
148    HELPER.assertCounter(pre + "majorCompactionInputSize_num_ops", 1, agg);
149    HELPER.assertCounter(pre + "majorCompactionOutputFileCount_num_ops", 1, agg);
150    HELPER.assertCounter(pre + "majorCompactedInputBytes", 400, agg);
151    HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg);
152  }
153
154  private void update(AtomicBoolean succ, int round, CyclicBarrier barrier) {
155    try {
156      for (int i = 0; i < round; i++) {
157        String tn = tableName + "-" + i;
158        barrier.await(10, TimeUnit.SECONDS);
159        rsm.updateFlush(tn, 100, 1000, 500);
160      }
161    } catch (Exception e) {
162      LOG.warn("Failed to update metrics", e);
163      succ.set(false);
164    }
165  }
166
167  @Test
168  public void testConcurrentUpdate() throws InterruptedException {
169    int threadNumber = 10;
170    int round = 100;
171    AtomicBoolean succ = new AtomicBoolean(true);
172    CyclicBarrier barrier = new CyclicBarrier(threadNumber);
173    Thread[] threads = IntStream.range(0, threadNumber)
174      .mapToObj(i -> new Thread(() -> update(succ, round, barrier), "Update-Worker-" + i))
175      .toArray(Thread[]::new);
176    Stream.of(threads).forEach(Thread::start);
177    for (Thread t : threads) {
178      t.join();
179    }
180    assertTrue(succ.get());
181  }
182}