001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023
024import com.codahale.metrics.RatioGauge;
025import com.codahale.metrics.RatioGauge.Ratio;
026import java.io.IOException;
027import java.util.Optional;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.Executors;
030import java.util.concurrent.ThreadPoolExecutor;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.security.User;
034import org.apache.hadoop.hbase.testclassification.ClientTests;
035import org.apache.hadoop.hbase.testclassification.MetricsTests;
036import org.apache.hadoop.hbase.testclassification.SmallTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.junit.AfterClass;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.mockito.Mockito;
044
045import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
046
047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
056
057@Category({ ClientTests.class, MetricsTests.class, SmallTests.class })
058public class TestMetricsConnection {
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestMetricsConnection.class);
062
063  private static MetricsConnection METRICS;
064  private static final ThreadPoolExecutor BATCH_POOL =
065    (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
066
067  @BeforeClass
068  public static void beforeClass() {
069    METRICS = new MetricsConnection("mocked-connection", () -> BATCH_POOL, () -> null);
070  }
071
072  @AfterClass
073  public static void afterClass() {
074    METRICS.shutdown();
075  }
076
077  @Test
078  public void testMetricsConnectionScopeAsyncClient() throws IOException {
079    Configuration conf = new Configuration();
080    String clusterId = "foo";
081    String scope = "testScope";
082    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
083
084    AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent());
085    Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
086    assertTrue("Metrics should be present", metrics.isPresent());
087    assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.get().scope);
088    conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
089    impl = new AsyncConnectionImpl(conf, null, "foo", User.getCurrent());
090
091    metrics = impl.getConnectionMetrics();
092    assertTrue("Metrics should be present", metrics.isPresent());
093    assertEquals(scope, metrics.get().scope);
094  }
095
096  @Test
097  public void testMetricsConnectionScopeBlockingClient() throws IOException {
098    Configuration conf = new Configuration();
099    String clusterId = "foo";
100    String scope = "testScope";
101    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
102
103    ConnectionRegistry mockRegistry = Mockito.mock(ConnectionRegistry.class);
104    Mockito.when(mockRegistry.getClusterId())
105      .thenReturn(CompletableFuture.completedFuture(clusterId));
106
107    ConnectionImplementation impl =
108      new ConnectionImplementation(conf, null, User.getCurrent(), mockRegistry);
109    MetricsConnection metrics = impl.getConnectionMetrics();
110    assertNotNull("Metrics should be present", metrics);
111    assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()), metrics.scope);
112    conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
113    impl = new ConnectionImplementation(conf, null, User.getCurrent(), mockRegistry);
114
115    metrics = impl.getConnectionMetrics();
116    assertNotNull("Metrics should be present", metrics);
117    assertEquals(scope, metrics.scope);
118  }
119
120  @Test
121  public void testStaticMetrics() throws IOException {
122    final byte[] foo = Bytes.toBytes("foo");
123    final RegionSpecifier region = RegionSpecifier.newBuilder().setValue(ByteString.EMPTY)
124      .setType(RegionSpecifierType.REGION_NAME).build();
125    final int loop = 5;
126
127    for (int i = 0; i < loop; i++) {
128      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Get"),
129        GetRequest.getDefaultInstance(), MetricsConnection.newCallStats());
130      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Scan"),
131        ScanRequest.getDefaultInstance(), MetricsConnection.newCallStats());
132      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Multi"),
133        MultiRequest.getDefaultInstance(), MetricsConnection.newCallStats());
134      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
135        MutateRequest.newBuilder()
136          .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo)))
137          .setRegion(region).build(),
138        MetricsConnection.newCallStats());
139      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
140        MutateRequest.newBuilder()
141          .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo)))
142          .setRegion(region).build(),
143        MetricsConnection.newCallStats());
144      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
145        MutateRequest.newBuilder()
146          .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo)))
147          .setRegion(region).build(),
148        MetricsConnection.newCallStats());
149      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
150        MutateRequest.newBuilder()
151          .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo))).setRegion(region)
152          .build(),
153        MetricsConnection.newCallStats());
154    }
155    for (String method : new String[] { "Get", "Scan", "Mutate" }) {
156      final String metricKey = "rpcCount_" + ClientService.getDescriptor().getName() + "_" + method;
157      final long metricVal = METRICS.rpcCounters.get(metricKey).getCount();
158      assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal >= loop);
159    }
160    for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] { METRICS.getTracker,
161      METRICS.scanTracker, METRICS.multiTracker, METRICS.appendTracker, METRICS.deleteTracker,
162      METRICS.incrementTracker, METRICS.putTracker }) {
163      assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.getCount());
164      assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.getCount());
165      assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.getCount());
166    }
167    RatioGauge executorMetrics =
168      (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getExecutorPoolName());
169    RatioGauge metaMetrics =
170      (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getMetaPoolName());
171    assertEquals(Ratio.of(0, 3).getValue(), executorMetrics.getValue(), 0);
172    assertEquals(Double.NaN, metaMetrics.getValue(), 0);
173  }
174}