001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import java.io.IOException;
023import java.util.concurrent.CyclicBarrier;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.stream.IntStream;
027import java.util.stream.Stream;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.CompatibilityFactory;
030import org.apache.hadoop.hbase.test.MetricsAssertHelper;
031import org.apache.hadoop.hbase.testclassification.MediumTests;
032import org.apache.hadoop.hbase.testclassification.RegionServerTests;
033import org.junit.jupiter.api.BeforeAll;
034import org.junit.jupiter.api.BeforeEach;
035import org.junit.jupiter.api.Tag;
036import org.junit.jupiter.api.Test;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040@Tag(RegionServerTests.TAG)
041@Tag(MediumTests.TAG)
042public class TestMetricsTableAggregate {
043
044  private static final Logger LOG = LoggerFactory.getLogger(TestMetricsTableAggregate.class);
045
046  private static MetricsAssertHelper HELPER =
047    CompatibilityFactory.getInstance(MetricsAssertHelper.class);
048
049  private String tableName = "testTableMetrics";
050  private String pre = "Namespace_default_table_" + tableName + "_metric_";
051
052  private MetricsTableWrapperStub tableWrapper;
053  private MetricsTable mt;
054  private MetricsRegionServerWrapper rsWrapper;
055  private MetricsRegionServer rsm;
056  private MetricsTableAggregateSource agg;
057
058  @BeforeAll
059  public static void classSetUp() {
060    HELPER.init();
061  }
062
063  @BeforeEach
064  public void setUp() {
065    tableWrapper = new MetricsTableWrapperStub(tableName);
066    mt = new MetricsTable(tableWrapper);
067    rsWrapper = new MetricsRegionServerWrapperStub();
068    Configuration conf = new Configuration();
069    rsm = new MetricsRegionServer(rsWrapper, conf, mt);
070    agg = mt.getTableSourceAgg();
071  }
072
073  @Test
074  public void testRequestMetrics() throws IOException {
075    HELPER.assertCounter(pre + "readRequestCount", 10, agg);
076    HELPER.assertCounter(pre + "writeRequestCount", 20, agg);
077    HELPER.assertCounter(pre + "totalRequestCount", 30, agg);
078  }
079
080  @Test
081  public void testRegionAndStoreMetrics() throws IOException {
082    HELPER.assertGauge(pre + "memstoreSize", 1000, agg);
083    HELPER.assertGauge(pre + "memstoreHeapSize", 1001, agg);
084    HELPER.assertGauge(pre + "memstoreOffHeapSize", 1002, agg);
085    HELPER.assertGauge(pre + "storeFileSize", 2000, agg);
086    HELPER.assertGauge(pre + "tableSize", 3000, agg);
087
088    HELPER.assertGauge(pre + "regionCount", 11, agg);
089    HELPER.assertGauge(pre + "storeCount", 22, agg);
090    HELPER.assertGauge(pre + "storeFileCount", 33, agg);
091    HELPER.assertGauge(pre + "maxStoreFileCount", 8, agg);
092    HELPER.assertGauge(pre + "maxStoreFileAge", 44, agg);
093    HELPER.assertGauge(pre + "minStoreFileAge", 55, agg);
094    HELPER.assertGauge(pre + "avgStoreFileAge", 66, agg);
095    HELPER.assertGauge(pre + "numReferenceFiles", 77, agg);
096    HELPER.assertGauge(pre + "averageRegionSize", 88, agg);
097
098    HELPER.assertGauge(pre + "staticIndexSize", 101, agg);
099    HELPER.assertGauge(pre + "staticBloomSize", 111, agg);
100
101    HELPER.assertCounter(pre + "bloomFilterRequestsCount", 222, agg);
102    HELPER.assertCounter(pre + "bloomFilterNegativeResultsCount", 333, agg);
103    HELPER.assertCounter(pre + "bloomFilterEligibleRequestsCount", 444, agg);
104  }
105
106  @Test
107  public void testPerStoreFileSize() {
108    String perCfPre =
109      "Namespace_default_table_" + tableName + "_columnfamily_info_metric_storeFileSize";
110    HELPER.assertGauge(perCfPre, 2000, agg);
111  }
112
113  @Test
114  public void testFlush() {
115    rsm.updateFlush(tableName, 1, 2, 3);
116    HELPER.assertCounter(pre + "flushTime_num_ops", 1, agg);
117    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 1, agg);
118    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 1, agg);
119    HELPER.assertCounter(pre + "flushedMemstoreBytes", 2, agg);
120    HELPER.assertCounter(pre + "flushedOutputBytes", 3, agg);
121
122    rsm.updateFlush(tableName, 10, 20, 30);
123    HELPER.assertCounter(pre + "flushTime_num_ops", 2, agg);
124    HELPER.assertCounter(pre + "flushMemstoreSize_num_ops", 2, agg);
125    HELPER.assertCounter(pre + "flushOutputSize_num_ops", 2, agg);
126    HELPER.assertCounter(pre + "flushedMemstoreBytes", 22, agg);
127    HELPER.assertCounter(pre + "flushedOutputBytes", 33, agg);
128  }
129
130  @Test
131  public void testCompaction() {
132    rsm.updateCompaction(tableName, false, 1, 2, 3, 4, 5);
133    HELPER.assertCounter(pre + "compactionTime_num_ops", 1, agg);
134    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 1, agg);
135    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 1, agg);
136    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 1, agg);
137    HELPER.assertCounter(pre + "compactedInputBytes", 4, agg);
138    HELPER.assertCounter(pre + "compactedoutputBytes", 5, agg);
139
140    rsm.updateCompaction(tableName, false, 10, 20, 30, 40, 50);
141    HELPER.assertCounter(pre + "compactionTime_num_ops", 2, agg);
142    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 2, agg);
143    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 2, agg);
144    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 2, agg);
145    HELPER.assertCounter(pre + "compactedInputBytes", 44, agg);
146    HELPER.assertCounter(pre + "compactedoutputBytes", 55, agg);
147
148    // do major compaction
149    rsm.updateCompaction(tableName, true, 100, 200, 300, 400, 500);
150
151    HELPER.assertCounter(pre + "compactionTime_num_ops", 3, agg);
152    HELPER.assertCounter(pre + "compactionInputFileCount_num_ops", 3, agg);
153    HELPER.assertCounter(pre + "compactionInputSize_num_ops", 3, agg);
154    HELPER.assertCounter(pre + "compactionOutputFileCount_num_ops", 3, agg);
155    HELPER.assertCounter(pre + "compactedInputBytes", 444, agg);
156    HELPER.assertCounter(pre + "compactedoutputBytes", 555, agg);
157
158    HELPER.assertCounter(pre + "majorCompactionTime_num_ops", 1, agg);
159    HELPER.assertCounter(pre + "majorCompactionInputFileCount_num_ops", 1, agg);
160    HELPER.assertCounter(pre + "majorCompactionInputSize_num_ops", 1, agg);
161    HELPER.assertCounter(pre + "majorCompactionOutputFileCount_num_ops", 1, agg);
162    HELPER.assertCounter(pre + "majorCompactedInputBytes", 400, agg);
163    HELPER.assertCounter(pre + "majorCompactedoutputBytes", 500, agg);
164  }
165
166  @Test
167  public void testSplitRequest() {
168    rsm.incrSplitRequest(null);
169    rsm.incrSplitRequest(tableName);
170    HELPER.assertCounter(pre + "splitRequestCount", 1, agg);
171  }
172
173  private void update(AtomicBoolean succ, int round, CyclicBarrier barrier) {
174    try {
175      for (int i = 0; i < round; i++) {
176        String tn = tableName + "-" + i;
177        barrier.await(10, TimeUnit.SECONDS);
178        rsm.updateFlush(tn, 100, 1000, 500);
179      }
180    } catch (Exception e) {
181      LOG.warn("Failed to update metrics", e);
182      succ.set(false);
183    }
184  }
185
186  @Test
187  public void testConcurrentUpdate() throws InterruptedException {
188    int threadNumber = 10;
189    int round = 100;
190    AtomicBoolean succ = new AtomicBoolean(true);
191    CyclicBarrier barrier = new CyclicBarrier(threadNumber);
192    Thread[] threads = IntStream.range(0, threadNumber)
193      .mapToObj(i -> new Thread(() -> update(succ, round, barrier), "Update-Worker-" + i))
194      .toArray(Thread[]::new);
195    Stream.of(threads).forEach(Thread::start);
196    for (Thread t : threads) {
197      t.join();
198    }
199    assertTrue(succ.get());
200  }
201}