001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023import static org.junit.Assert.assertTrue;
024
025import com.codahale.metrics.Counter;
026import com.codahale.metrics.RatioGauge;
027import com.codahale.metrics.RatioGauge.Ratio;
028import com.codahale.metrics.Timer;
029import java.io.IOException;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.List;
033import java.util.Optional;
034import java.util.concurrent.CompletableFuture;
035import java.util.concurrent.Executors;
036import java.util.concurrent.ThreadPoolExecutor;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.ipc.CallTimeoutException;
041import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
042import org.apache.hadoop.hbase.security.User;
043import org.apache.hadoop.hbase.testclassification.ClientTests;
044import org.apache.hadoop.hbase.testclassification.MetricsTests;
045import org.apache.hadoop.hbase.testclassification.SmallTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.junit.After;
048import org.junit.Before;
049import org.junit.ClassRule;
050import org.junit.Test;
051import org.junit.experimental.categories.Category;
052import org.junit.runner.RunWith;
053import org.junit.runners.Parameterized;
054import org.junit.runners.Parameterized.Parameter;
055import org.junit.runners.Parameterized.Parameters;
056import org.mockito.Mockito;
057
058import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
059
060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
070
071@RunWith(Parameterized.class)
072@Category({ ClientTests.class, MetricsTests.class, SmallTests.class })
073public class TestMetricsConnection {
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestMetricsConnection.class);
077
078  private static final Configuration conf = new Configuration();
079  private static MetricsConnection METRICS;
080  private static final ThreadPoolExecutor BATCH_POOL =
081    (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
082
083  private static final String MOCK_CONN_STR = "mocked-connection";
084
085  @Parameter()
086  public boolean tableMetricsEnabled;
087
088  @Parameters
089  public static List<Boolean> params() {
090    return Arrays.asList(false, true);
091  }
092
093  @Before
094  public void before() {
095    conf.setBoolean(MetricsConnection.CLIENT_SIDE_TABLE_METRICS_ENABLED_KEY, tableMetricsEnabled);
096    METRICS =
097      MetricsConnection.getMetricsConnection(conf, MOCK_CONN_STR, () -> BATCH_POOL, () -> null);
098  }
099
100  @After
101  public void after() {
102    MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR);
103  }
104
105  @Test
106  public void testMetricsConnectionScopeAsyncClient() throws IOException {
107    Configuration conf = new Configuration();
108    String clusterId = "foo";
109    String scope = "testScope";
110    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
111
112    AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent());
113    Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
114    assertTrue("Metrics should be present", metrics.isPresent());
115    assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()),
116      metrics.get().getMetricScope());
117    conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
118    impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent());
119
120    metrics = impl.getConnectionMetrics();
121    assertTrue("Metrics should be present", metrics.isPresent());
122    assertEquals(scope, metrics.get().getMetricScope());
123  }
124
125  @Test
126  public void testMetricsWithMultiConnections() throws IOException {
127    Configuration conf = new Configuration();
128    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
129    conf.set(MetricsConnection.METRICS_SCOPE_KEY, "unit-test");
130
131    User user = User.getCurrent();
132
133    /* create multiple connections */
134    final int num = 3;
135    AsyncConnectionImpl impl;
136    List<AsyncConnectionImpl> connList = new ArrayList<AsyncConnectionImpl>();
137    for (int i = 0; i < num; i++) {
138      impl = new AsyncConnectionImpl(conf, null, null, user);
139      connList.add(impl);
140    }
141
142    /* verify metrics presence */
143    impl = connList.get(0);
144    Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
145    assertTrue("Metrics should be present", metrics.isPresent());
146
147    /* verify connection count in a shared metrics */
148    long count = metrics.get().getConnectionCount();
149    assertEquals("Failed to verify connection count." + count, count, num);
150
151    /* close some connections */
152    for (int i = 0; i < num - 1; i++) {
153      connList.get(i).close();
154    }
155
156    /* verify metrics presence again */
157    impl = connList.get(num - 1);
158    metrics = impl.getConnectionMetrics();
159    assertTrue("Metrics should be present after some of connections are closed.",
160      metrics.isPresent());
161
162    /* verify count of remaining connections */
163    count = metrics.get().getConnectionCount();
164    assertEquals("Connection count suppose to be 1 but got: " + count, count, 1);
165
166    /* shutdown */
167    impl.close();
168  }
169
170  @Test
171  public void testMetricsConnectionScopeBlockingClient() throws IOException {
172    Configuration conf = new Configuration();
173    String clusterId = "foo";
174    String scope = "testScope";
175    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
176
177    ConnectionRegistry mockRegistry = Mockito.mock(ConnectionRegistry.class);
178    Mockito.when(mockRegistry.getClusterId())
179      .thenReturn(CompletableFuture.completedFuture(clusterId));
180
181    ConnectionImplementation impl =
182      new ConnectionImplementation(conf, null, User.getCurrent(), mockRegistry);
183    MetricsConnection metrics = impl.getConnectionMetrics();
184    assertNotNull("Metrics should be present", metrics);
185    assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.getMetricScope());
186    conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
187    impl = new ConnectionImplementation(conf, null, User.getCurrent(), mockRegistry);
188
189    metrics = impl.getConnectionMetrics();
190    assertNotNull("Metrics should be present", metrics);
191    assertEquals(scope, metrics.getMetricScope());
192  }
193
194  @Test
195  public void testStaticMetrics() throws IOException {
196    final byte[] foo = Bytes.toBytes("foo");
197    String table = "TableX";
198    final RegionSpecifier region = RegionSpecifier.newBuilder()
199      .setValue(ByteString.copyFromUtf8(table)).setType(RegionSpecifierType.REGION_NAME).build();
200    final int loop = 5;
201
202    for (int i = 0; i < loop; i++) {
203      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Get"),
204        TableName.valueOf(table),
205        GetRequest.newBuilder().setRegion(region).setGet(ProtobufUtil.toGet(new Get(foo))).build(),
206        MetricsConnection.newCallStats(), null);
207      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Scan"),
208        TableName.valueOf(table),
209        ScanRequest.newBuilder().setRegion(region)
210          .setScan(ProtobufUtil.toScan(new Scan(new Get(foo)))).build(),
211        MetricsConnection.newCallStats(),
212        new RemoteWithExtrasException("java.io.IOException", null, false, false));
213      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Multi"),
214        TableName.valueOf(table),
215        MultiRequest.newBuilder()
216          .addRegionAction(ClientProtos.RegionAction.newBuilder()
217            .addAction(
218              ClientProtos.Action.newBuilder().setGet(ProtobufUtil.toGet(new Get(foo))).build())
219            .setRegion(region).build())
220          .build(),
221        MetricsConnection.newCallStats(),
222        new CallTimeoutException("test with CallTimeoutException"));
223      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
224        TableName.valueOf(table),
225        MutateRequest.newBuilder()
226          .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo)))
227          .setRegion(region).build(),
228        MetricsConnection.newCallStats(), null);
229      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
230        TableName.valueOf(table),
231        MutateRequest.newBuilder()
232          .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo)))
233          .setRegion(region).build(),
234        MetricsConnection.newCallStats(), null);
235      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
236        TableName.valueOf(table),
237        MutateRequest.newBuilder()
238          .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo)))
239          .setRegion(region).build(),
240        MetricsConnection.newCallStats(), null);
241      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
242        TableName.valueOf(table),
243        MutateRequest.newBuilder()
244          .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo))).setRegion(region)
245          .build(),
246        MetricsConnection.newCallStats(),
247        new CallTimeoutException("test with CallTimeoutException"));
248    }
249
250    testRpcCallMetrics(table, loop);
251
252    String metricKey;
253    long metricVal;
254    Counter counter;
255
256    // remote exception
257    metricKey = "rpcRemoteExceptions_IOException";
258    counter = METRICS.getRpcCounters().get(metricKey);
259    metricVal = (counter != null) ? counter.getCount() : 0;
260    assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop);
261
262    // local exception
263    metricKey = "rpcLocalExceptions_CallTimeoutException";
264    counter = METRICS.getRpcCounters().get(metricKey);
265    metricVal = (counter != null) ? counter.getCount() : 0;
266    assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop * 2);
267
268    // total exception
269    metricKey = "rpcTotalExceptions";
270    counter = METRICS.getRpcCounters().get(metricKey);
271    metricVal = (counter != null) ? counter.getCount() : 0;
272    assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, loop * 3);
273
274    testRpcCallTableMetrics(table, loop);
275
276    for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
277      METRICS.getGetTracker(), METRICS.getScanTracker(), METRICS.getMultiTracker(),
278      METRICS.getAppendTracker(), METRICS.getDeleteTracker(), METRICS.getIncrementTracker(),
279      METRICS.getPutTracker() }) {
280      assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.getCount());
281      assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.getCount());
282      assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.getCount());
283    }
284    RatioGauge executorMetrics =
285      (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getExecutorPoolName());
286    RatioGauge metaMetrics =
287      (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getMetaPoolName());
288    assertEquals(Ratio.of(0, 3).getValue(), executorMetrics.getValue(), 0);
289    assertEquals(Double.NaN, metaMetrics.getValue(), 0);
290  }
291
292  private void testRpcCallTableMetrics(String table, int expectedVal) {
293    String metricKey;
294    Timer timer;
295    String numOpsSuffix = "_num_ops";
296    String p95Suffix = "_95th_percentile";
297    String p99Suffix = "_99th_percentile";
298    String service = ClientService.getDescriptor().getName();
299    for (String m : new String[] { "Get", "Scan", "Multi" }) {
300      metricKey = "rpcCallDurationMs_" + service + "_" + m + "_" + table;
301      timer = METRICS.getRpcTimers().get(metricKey);
302      if (tableMetricsEnabled) {
303        long numOps = timer.getCount();
304        double p95 = timer.getSnapshot().get95thPercentile();
305        double p99 = timer.getSnapshot().get99thPercentile();
306        assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + numOps, expectedVal,
307          numOps);
308        assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0);
309        assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0);
310      } else {
311        assertNull(timer);
312      }
313    }
314
315    // Distinguish mutate types for mutate method.
316    String mutateMethod = "Mutate";
317    for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) {
318      metricKey = "rpcCallDurationMs_" + service + "_" + mutateMethod + "(" + mutationType + ")"
319        + "_" + table;
320      timer = METRICS.getRpcTimers().get(metricKey);
321      if (tableMetricsEnabled) {
322        long numOps = timer.getCount();
323        double p95 = timer.getSnapshot().get95thPercentile();
324        double p99 = timer.getSnapshot().get99thPercentile();
325        assertEquals("metric: " + metricKey + numOpsSuffix + " val: " + numOps, expectedVal,
326          numOps);
327        assertTrue("metric: " + metricKey + p95Suffix + " val: " + p95, p95 >= 0);
328        assertTrue("metric: " + metricKey + p99Suffix + " val: " + p99, p99 >= 0);
329      } else {
330        assertNull(timer);
331      }
332    }
333  }
334
335  private void testRpcCallMetrics(String table, int expectedVal) {
336    final String rpcCountPrefix = "rpcCount_" + ClientService.getDescriptor().getName() + "_";
337    final String rpcFailureCountPrefix =
338      "rpcFailureCount_" + ClientService.getDescriptor().getName() + "_";
339    String metricKey;
340    long metricVal;
341    Counter counter;
342
343    for (String method : new String[] { "Get", "Scan", "Multi" }) {
344      // rpc call count
345      metricKey = rpcCountPrefix + method;
346      metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
347      assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal);
348
349      // rpc failure call
350      metricKey = tableMetricsEnabled
351        ? rpcFailureCountPrefix + method + "_" + table
352        : rpcFailureCountPrefix + method;
353      counter = METRICS.getRpcCounters().get(metricKey);
354      metricVal = (counter != null) ? counter.getCount() : 0;
355      if (method.equals("Get")) {
356        // no failure
357        assertEquals("metric: " + metricKey + " val: " + metricVal, 0, metricVal);
358      } else {
359        // has failure
360        assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal);
361      }
362    }
363
364    String method = "Mutate";
365    for (String mutationType : new String[] { "Append", "Delete", "Increment", "Put" }) {
366      // rpc call count
367      metricKey = rpcCountPrefix + method + "(" + mutationType + ")";
368      metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
369      assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal);
370
371      // rpc failure call
372      metricKey = tableMetricsEnabled
373        ? rpcFailureCountPrefix + method + "(" + mutationType + ")" + "_" + table
374        : rpcFailureCountPrefix + method + "(" + mutationType + ")";
375      counter = METRICS.getRpcCounters().get(metricKey);
376      metricVal = (counter != null) ? counter.getCount() : 0;
377      if (mutationType.equals("Put")) {
378        // has failure
379        assertEquals("metric: " + metricKey + " val: " + metricVal, metricVal, expectedVal);
380      } else {
381        // no failure
382        assertEquals("metric: " + metricKey + " val: " + metricVal, 0, metricVal);
383      }
384    }
385  }
386}