001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertTrue;
021import java.io.IOException;
022import java.util.concurrent.CyclicBarrier;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.stream.IntStream;
026import java.util.stream.Stream;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.CompatibilityFactory;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.test.MetricsAssertHelper;
031import org.apache.hadoop.hbase.testclassification.MediumTests;
032import org.apache.hadoop.hbase.testclassification.RegionServerTests;
033import org.junit.Before;
034import org.junit.BeforeClass;
035import org.junit.ClassRule;
036import org.junit.Test;
037import org.junit.experimental.categories.Category;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041@Category({ RegionServerTests.class, MediumTests.class })
042public class TestMetricsTableAggregate {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046    HBaseClassTestRule.forClass(TestMetricsTableAggregate.class);
047
048  private static final Logger LOG = LoggerFactory.getLogger(TestMetricsTableAggregate.class);
049
050  private static MetricsAssertHelper HELPER =
051    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
052
053  private String tableName = "testTableMetrics";
054  private String pre = "Namespace_default_table_" + tableName + "_metric_";
055
056  private MetricsTableWrapperStub tableWrapper;
057  private MetricsTable mt;
058  private MetricsRegionServerWrapper rsWrapper;
059  private MetricsRegionServer rsm;
060  private MetricsTableAggregateSource agg;
061
062  @BeforeClass
063  public static void classSetUp() {
064    HELPER.init();
065  }
066
067  @Before
068  public void setUp() {
069    tableWrapper = new MetricsTableWrapperStub(tableName);
070    mt = new MetricsTable(tableWrapper);
071    rsWrapper = new MetricsRegionServerWrapperStub();
072    Configuration conf = new Configuration();
073    rsm = new MetricsRegionServer(rsWrapper, conf, mt);
074    agg = mt.getTableSourceAgg();
075  }
076
077  @Test
078  public void testRequestMetrics() throws IOException {
079    HELPER.assertCounter(pre + "readRequestCount", 10, agg);
080    HELPER.assertCounter(pre + "writeRequestCount", 20, agg);
081    HELPER.assertCounter(pre + "totalRequestCount", 30, agg);
082  }
083
084  @Test
085  public void testRegionAndStoreMetrics() throws IOException {
086    HELPER.assertGauge(pre + "memstoreSize", 1000, agg);
087    HELPER.assertGauge(pre + "storeFileSize", 2000, agg);
088    HELPER.assertGauge(pre + "tableSize", 3000, agg);
089
090    HELPER.assertGauge(pre + "regionCount", 11, agg);
091    HELPER.assertGauge(pre + "storeCount", 22, agg);
092    HELPER.assertGauge(pre + "storeFileCount", 33, agg);
093    HELPER.assertGauge(pre + "maxStoreFileAge", 44, agg);
094    HELPER.assertGauge(pre + "minStoreFileAge", 55, agg);
095    HELPER.assertGauge(pre + "avgStoreFileAge", 66, agg);
096    HELPER.assertGauge(pre + "numReferenceFiles", 77, agg);
097    HELPER.assertGauge(pre + "averageRegionSize", 88, agg);
098  }
099
100  @Test
101  public void testFlush() {
102    rsm.updateFlush(tableName, 1, 2, 3);
103    HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg);
104    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 1, agg);
105    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 1, agg);
106    HELPER.assertCounter(pre + "flushedMemstoreBytes", 2, agg);
107    HELPER.assertCounter(pre + "flushedOutputBytes", 3, agg);
108
109    rsm.updateFlush(tableName, 10, 20, 30);
110    HELPER.assertCounter(pre + "flushTime_num_ops", 2, agg);
111    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 2, agg);
112    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 2, agg);
113    HELPER.assertCounter(pre + "flushedMemstoreBytes", 22, agg);
114    HELPER.assertCounter(pre + "flushedOutputBytes", 33, agg);
115  }
116
117  @Test
118  public void testCompaction() {
119    rsm.updateCompaction(tableName, false, 1, 2, 3, 4, 5);
120    HELPER.assertCounter(pre + "compactionTime_num_ops", 1, agg);
121    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 1, agg);
122    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 1, agg);
123    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 1, agg);
124    HELPER.assertCounter(pre + "compactedInputBytes", 4, agg);
125    HELPER.assertCounter(pre + "compactedoutputBytes", 5, agg);
126
127    rsm.updateCompaction(tableName, false, 10, 20, 30, 40, 50);
128    HELPER.assertCounter(pre + "compactionTime_num_ops", 2, agg);
129    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 2, agg);
130    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 2, agg);
131    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 2, agg);
132    HELPER.assertCounter(pre + "compactedInputBytes", 44, agg);
133    HELPER.assertCounter(pre + "compactedoutputBytes", 55, agg);
134
135    // do major compaction
136    rsm.updateCompaction(tableName, true, 100, 200, 300, 400, 500);
137
138    HELPER.assertCounter(pre + "compactionTime_num_ops", 3, agg);
139    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 3, agg);
140    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 3, agg);
141    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 3, agg);
142    HELPER.assertCounter(pre + "compactedInputBytes", 444, agg);
143    HELPER.assertCounter(pre + "compactedoutputBytes", 555, agg);
144
145    HELPER.assertCounter(pre + "majorCompactionTime_num_ops", 1, agg);
146    HELPER.assertCounter(pre + "majorCompactionInputFileCount_num_ops", 1, agg);
147    HELPER.assertCounter(pre + "majorCompactionInputSize_num_ops", 1, agg);
148    HELPER.assertCounter(pre + "majorCompactionOutputFileCount_num_ops", 1, agg);
149    HELPER.assertCounter(pre + "majorCompactedInputBytes", 400, agg);
150    HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg);
151  }
152
153  private void update(AtomicBoolean succ, int round, CyclicBarrier barrier) {
154    try {
155      for (int i = 0; i < round; i++) {
156        String tn = tableName + "-" + i;
157        barrier.await(10, TimeUnit.SECONDS);
158        rsm.updateFlush(tn, 100, 1000, 500);
159      }
160    } catch (Exception e) {
161      LOG.warn("Failed to update metrics", e);
162      succ.set(false);
163    }
164  }
165
166  @Test
167  public void testConcurrentUpdate() throws InterruptedException {
168    int threadNumber = 10;
169    int round = 100;
170    AtomicBoolean succ = new AtomicBoolean(true);
171    CyclicBarrier barrier = new CyclicBarrier(threadNumber);
172    Thread[] threads = IntStream.range(0, threadNumber)
173      .mapToObj(i -> new Thread(() -> update(succ, round, barrier), "Update-Worker-" + i))
174      .toArray(Thread[]::new);
175    Stream.of(threads).forEach(Thread::start);
176    for (Thread t : threads) {
177      t.join();
178    }
179    assertTrue(succ.get());
180  }
181}