001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import com.codahale.metrics.Counter;
025import com.codahale.metrics.RatioGauge;
026import com.codahale.metrics.RatioGauge.Ratio;
027import com.codahale.metrics.Timer;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.List;
031import java.util.Optional;
032import java.util.concurrent.Executors;
033import java.util.concurrent.ThreadPoolExecutor;
034import java.util.stream.Stream;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.ipc.CallTimeoutException;
039import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.testclassification.ClientTests;
042import org.apache.hadoop.hbase.testclassification.MetricsTests;
043import org.apache.hadoop.hbase.testclassification.SmallTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.jupiter.api.AfterEach;
046import org.junit.jupiter.api.BeforeEach;
047import org.junit.jupiter.api.Tag;
048import org.junit.jupiter.api.TestTemplate;
049import org.junit.jupiter.params.provider.Arguments;
050
051import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
052
053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
063
064@Tag(ClientTests.TAG)
065@Tag(MetricsTests.TAG)
066@Tag(SmallTests.TAG)
067@HBaseParameterizedTestTemplate
068public class TestMetricsConnection {
069
070  private static final Configuration conf = new Configuration();
071  private static MetricsConnection METRICS;
072  private static final ThreadPoolExecutor BATCH_POOL =
073    (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
074
075  private static final String MOCK_CONN_STR = "mocked-connection";
076
077  public boolean tableMetricsEnabled;
078
079  public TestMetricsConnection(boolean tableMetricsEnabled) {
080    this.tableMetricsEnabled = tableMetricsEnabled;
081  }
082
083  public static Stream<Arguments> parameters() {
084    return Stream.of(Arguments.of(true), Arguments.of(false));
085  }
086
087  @BeforeEach
088  public void before() {
089    conf.setBoolean(MetricsConnection.CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, tableMetricsEnabled);
090    METRICS =
091      MetricsConnection.getMetricsConnection(conf, MOCK_CONN_STR, () -> BATCH_POOL, () -> null);
092  }
093
094  @AfterEach
095  public void after() {
096    MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR);
097  }
098
099  @TestTemplate
100  public void testMetricsConnectionScope() throws IOException {
101    Configuration conf = new Configuration();
102    String clusterId = "foo";
103    String scope = "testScope";
104    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
105
106    AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent());
107    Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
108    assertTrue(metrics.isPresent(), "Metrics should be present");
109    assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()),
110      metrics.get().getMetricScope());
111    conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
112    impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent());
113
114    metrics = impl.getConnectionMetrics();
115    assertTrue(metrics.isPresent(), "Metrics should be present");
116    assertEquals(scope, metrics.get().getMetricScope());
117  }
118
119  @TestTemplate
120  public void testMetricsWithMultiConnections() throws IOException {
121    Configuration conf = new Configuration();
122    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
123    conf.set(MetricsConnection.METRICS_SCOPE_KEY, "unit-test");
124
125    User user = User.getCurrent();
126
127    /* create multiple connections */
128    final int num = 3;
129    AsyncConnectionImpl impl;
130    List<AsyncConnectionImpl> connList = new ArrayList<AsyncConnectionImpl>();
131    for (int i = 0; i < num; i++) {
132      impl = new AsyncConnectionImpl(conf, null, null, null, user);
133      connList.add(impl);
134    }
135
136    /* verify metrics presence */
137    impl = connList.get(0);
138    Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
139    assertTrue(metrics.isPresent(), "Metrics should be present");
140
141    /* verify connection count in a shared metrics */
142    long count = metrics.get().getConnectionCount();
143    assertEquals(count, num, "Failed to verify connection count." + count);
144
145    /* close some connections */
146    for (int i = 0; i < num - 1; i++) {
147      connList.get(i).close();
148    }
149
150    /* verify metrics presence again */
151    impl = connList.get(num - 1);
152    metrics = impl.getConnectionMetrics();
153    assertTrue(metrics.isPresent(),
154      "Metrics should be present after some of connections are closed.");
155
156    /* verify count of remaining connections */
157    count = metrics.get().getConnectionCount();
158    assertEquals(count, 1, "Connection count suppose to be 1 but got: " + count);
159
160    /* shutdown */
161    impl.close();
162  }
163
164  @TestTemplate
165  public void testStaticMetrics() throws IOException {
166    final byte[] foo = Bytes.toBytes("foo");
167    String table = "TableX";
168    final RegionSpecifier region = RegionSpecifier.newBuilder()
169      .setValue(ByteString.copyFromUtf8(table)).setType(RegionSpecifierType.REGION_NAME).build();
170    final int loop = 5;
171
172    for (int i = 0; i < loop; i++) {
173      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Get"),
174        TableName.valueOf(table),
175        GetRequest.newBuilder().setRegion(region).setGet(ProtobufUtil.toGet(new Get(foo))).build(),
176        MetricsConnection.newCallStats(), null);
177      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Scan"),
178        TableName.valueOf(table),
179        ScanRequest.newBuilder().setRegion(region)
180          .setScan(ProtobufUtil.toScan(new Scan(new Get(foo)))).build(),
181        MetricsConnection.newCallStats(),
182        new RemoteWithExtrasException("java.io.IOException", null, false, false));
183      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Multi"),
184        TableName.valueOf(table),
185        MultiRequest.newBuilder()
186          .addRegionAction(ClientProtos.RegionAction.newBuilder()
187            .addAction(
188              ClientProtos.Action.newBuilder().setGet(ProtobufUtil.toGet(new Get(foo))).build())
189            .setRegion(region).build())
190          .build(),
191        MetricsConnection.newCallStats(),
192        new CallTimeoutException("test with CallTimeoutException"));
193      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
194        TableName.valueOf(table),
195        MutateRequest.newBuilder()
196          .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo)))
197          .setRegion(region).build(),
198        MetricsConnection.newCallStats(), null);
199      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
200        TableName.valueOf(table),
201        MutateRequest.newBuilder()
202          .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo)))
203          .setRegion(region).build(),
204        MetricsConnection.newCallStats(), null);
205      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
206        TableName.valueOf(table),
207        MutateRequest.newBuilder()
208          .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo)))
209          .setRegion(region).build(),
210        MetricsConnection.newCallStats(), null);
211      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
212        TableName.valueOf(table),
213        MutateRequest.newBuilder()
214          .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo))).setRegion(region)
215          .build(),
216        MetricsConnection.newCallStats(),
217        new CallTimeoutException("test with CallTimeoutException"));
218    }
219
220    testRpcCallMetrics(table, loop);
221
222    String metricKey;
223    long metricVal;
224    Counter counter;
225
226    // remote exception
227    metricKey = "rpcRemoteExceptions_IOException";
228    counter = METRICS.getRpcCounters().get(metricKey);
229    metricVal = (counter != null) ? counter.getCount() : 0;
230    assertEquals(metricVal, loop, "metric: " + metricKey + " val: " + metricVal);
231
232    // local exception
233    metricKey = "rpcLocalExceptions_CallTimeoutException";
234    counter = METRICS.getRpcCounters().get(metricKey);
235    metricVal = (counter != null) ? counter.getCount() : 0;
236    assertEquals(metricVal, loop * 2, "metric: " + metricKey + " val: " + metricVal);
237
238    // total exception
239    metricKey = "rpcTotalExceptions";
240    counter = METRICS.getRpcCounters().get(metricKey);
241    metricVal = (counter != null) ? counter.getCount() : 0;
242    assertEquals(metricVal, loop * 3, "metric: " + metricKey + " val: " + metricVal);
243
244    testRpcCallTableMetrics(table, loop);
245
246    for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
247      METRICS.getGetTracker(), METRICS.getScanTracker(), METRICS.getMultiTracker(),
248      METRICS.getAppendTracker(), METRICS.getDeleteTracker(), METRICS.getIncrementTracker(),
249      METRICS.getPutTracker() }) {
250      assertEquals(loop, t.callTimer.getCount(), "Failed to invoke callTimer on " + t);
251      assertEquals(loop, t.reqHist.getCount(), "Failed to invoke reqHist on " + t);
252      assertEquals(loop, t.respHist.getCount(), "Failed to invoke respHist on " + t);
253    }
254    RatioGauge executorMetrics =
255      (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getExecutorPoolName());
256    RatioGauge metaMetrics =
257      (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getMetaPoolName());
258    assertEquals(Ratio.of(0, 3).getValue(), executorMetrics.getValue(), 0);
259    assertEquals(Double.NaN, metaMetrics.getValue(), 0);
260  }
261
262  private void testRpcCallTableMetrics(String table, int expectedVal) {
263    String metricKey;
264    Timer timer;
265    String numOpsSuffix = "_num_ops";
266    String p95Suffix = "_95th_percentile";
267    String p99Suffix = "_99th_percentile";
268    String service = ClientService.getDescriptor().getName();
269    for (String m : new String[] { "Get", "Scan", "Multi" }) {
270      metricKey = "rpcCallDurationMs_" + service + "_" + m + "_" + table;
271      timer = METRICS.getRpcTimers().get(metricKey);
272      if (tableMetricsEnabled) {
273        long numOps = timer.getCount();
274        double p95 = timer.getSnapshot().get95thPercentile();
275        double p99 = timer.getSnapshot().get99thPercentile();
276        assertEquals(expectedVal, numOps,
277          "metric: " + metricKey + numOpsSuffix + " val: " + numOps);
278        assertTrue(p95 >= 0, "metric: " + metricKey + p95Suffix + " val: " + p95);
279        assertTrue(p99 >= 0, "metric: " + metricKey + p99Suffix + " val: " + p99);
280      } else {
281        assertNull(timer);
282      }
283    }
284
285    // Distinguish mutate types for mutate method.
286    String mutateMethod = "Mutate";
287    for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) {
288      metricKey = "rpcCallDurationMs_" + service + "_" + mutateMethod + "(" + mutationType + ")"
289        + "_" + table;
290      timer = METRICS.getRpcTimers().get(metricKey);
291      if (tableMetricsEnabled) {
292        long numOps = timer.getCount();
293        double p95 = timer.getSnapshot().get95thPercentile();
294        double p99 = timer.getSnapshot().get99thPercentile();
295        assertEquals(expectedVal, numOps,
296          "metric: " + metricKey + numOpsSuffix + " val: " + numOps);
297        assertTrue(p95 >= 0, "metric: " + metricKey + p95Suffix + " val: " + p95);
298        assertTrue(p99 >= 0, "metric: " + metricKey + p99Suffix + " val: " + p99);
299      } else {
300        assertNull(timer);
301      }
302    }
303  }
304
305  private void testRpcCallMetrics(String table, int expectedVal) {
306    final String rpcCountPrefix = "rpcCount_" + ClientService.getDescriptor().getName() + "_";
307    final String rpcFailureCountPrefix =
308      "rpcFailureCount_" + ClientService.getDescriptor().getName() + "_";
309    String metricKey;
310    long metricVal;
311    Counter counter;
312
313    for (String method : new String[] { "Get", "Scan", "Multi" }) {
314      // rpc call count
315      metricKey = rpcCountPrefix + method;
316      metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
317      assertEquals(metricVal, expectedVal, "metric: " + metricKey + " val: " + metricVal);
318
319      // rpc failure call
320      metricKey = tableMetricsEnabled
321        ? rpcFailureCountPrefix + method + "_" + table
322        : rpcFailureCountPrefix + method;
323      counter = METRICS.getRpcCounters().get(metricKey);
324      metricVal = (counter != null) ? counter.getCount() : 0;
325      if (method.equals("Get")) {
326        // no failure
327        assertEquals(0, metricVal, "metric: " + metricKey + " val: " + metricVal);
328      } else {
329        // has failure
330        assertEquals(metricVal, expectedVal, "metric: " + metricKey + " val: " + metricVal);
331      }
332    }
333
334    String method = "Mutate";
335    for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) {
336      // rpc call count
337      metricKey = rpcCountPrefix + method + "(" + mutationType + ")";
338      metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
339      assertEquals(metricVal, expectedVal, "metric: " + metricKey + " val: " + metricVal);
340
341      // rpc failure call
342      metricKey = tableMetricsEnabled
343        ? rpcFailureCountPrefix + method + "(" + mutationType + ")" + "_" + table
344        : rpcFailureCountPrefix + method + "(" + mutationType + ")";
345      counter = METRICS.getRpcCounters().get(metricKey);
346      metricVal = (counter != null) ? counter.getCount() : 0;
347      if (mutationType.equals("Put")) {
348        // has failure
349        assertEquals(metricVal, expectedVal, "metric: " + metricKey + " val: " + metricVal);
350      } else {
351        // no failure
352        assertEquals(0, metricVal, "metric: " + metricKey + " val: " + metricVal);
353      }
354    }
355  }
356}