001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import com.codahale.metrics.Counter;
024import com.codahale.metrics.RatioGauge;
025import com.codahale.metrics.RatioGauge.Ratio;
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Optional;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ThreadPoolExecutor;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.ipc.CallTimeoutException;
035import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
036import org.apache.hadoop.hbase.security.User;
037import org.apache.hadoop.hbase.testclassification.ClientTests;
038import org.apache.hadoop.hbase.testclassification.MetricsTests;
039import org.apache.hadoop.hbase.testclassification.SmallTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046
047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
048
049import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
058
059@Category({ ClientTests.class, MetricsTests.class, SmallTests.class })
060public class TestMetricsConnection {
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestMetricsConnection.class);
064
065  private static MetricsConnection METRICS;
066  private static final ThreadPoolExecutor BATCH_POOL =
067    (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
068
069  private static final String MOCK_CONN_STR = "mocked-connection";
070
071  @BeforeClass
072  public static void beforeClass() {
073    METRICS = MetricsConnection.getMetricsConnection(MOCK_CONN_STR, () -> BATCH_POOL, () -> null);
074  }
075
076  @AfterClass
077  public static void afterClass() {
078    MetricsConnection.deleteMetricsConnection(MOCK_CONN_STR);
079  }
080
081  @Test
082  public void testMetricsConnectionScope() throws IOException {
083    Configuration conf = new Configuration();
084    String clusterId = "foo";
085    String scope = "testScope";
086    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
087
088    AsyncConnectionImpl impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent());
089    Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
090    assertTrue("Metrics should be present", metrics.isPresent());
091    assertEquals(clusterId + "@" + Integer.toHexString(impl.hashCode()),
092      metrics.get().getMetricScope());
093    conf.set(MetricsConnection.METRICS_SCOPE_KEY, scope);
094    impl = new AsyncConnectionImpl(conf, null, "foo", null, User.getCurrent());
095
096    metrics = impl.getConnectionMetrics();
097    assertTrue("Metrics should be present", metrics.isPresent());
098    assertEquals(scope, metrics.get().getMetricScope());
099  }
100
101  @Test
102  public void testMetricsWithMutiConnections() throws IOException {
103    Configuration conf = new Configuration();
104    conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
105    conf.set(MetricsConnection.METRICS_SCOPE_KEY, "unit-test");
106
107    User user = User.getCurrent();
108
109    /* create multiple connections */
110    final int num = 3;
111    AsyncConnectionImpl impl;
112    List<AsyncConnectionImpl> connList = new ArrayList<AsyncConnectionImpl>();
113    for (int i = 0; i < num; i++) {
114      impl = new AsyncConnectionImpl(conf, null, null, null, user);
115      connList.add(impl);
116    }
117
118    /* verify metrics presence */
119    impl = connList.get(0);
120    Optional<MetricsConnection> metrics = impl.getConnectionMetrics();
121    assertTrue("Metrics should be present", metrics.isPresent());
122
123    /* verify connection count in a shared metrics */
124    long count = metrics.get().getConnectionCount();
125    assertEquals("Failed to verify connection count." + count, count, num);
126
127    /* close some connections */
128    for (int i = 0; i < num - 1; i++) {
129      connList.get(i).close();
130    }
131
132    /* verify metrics presence again */
133    impl = connList.get(num - 1);
134    metrics = impl.getConnectionMetrics();
135    assertTrue("Metrics should be present after some of connections are closed.",
136      metrics.isPresent());
137
138    /* verify count of remaining connections */
139    count = metrics.get().getConnectionCount();
140    assertEquals("Connection count suppose to be 1 but got: " + count, count, 1);
141
142    /* shutdown */
143    impl.close();
144  }
145
146  @Test
147  public void testStaticMetrics() throws IOException {
148    final byte[] foo = Bytes.toBytes("foo");
149    final RegionSpecifier region = RegionSpecifier.newBuilder().setValue(ByteString.EMPTY)
150      .setType(RegionSpecifierType.REGION_NAME).build();
151    final int loop = 5;
152
153    for (int i = 0; i < loop; i++) {
154      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Get"),
155        GetRequest.getDefaultInstance(), MetricsConnection.newCallStats(), null);
156      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Scan"),
157        ScanRequest.getDefaultInstance(), MetricsConnection.newCallStats(),
158        new RemoteWithExtrasException("java.io.IOException", null, false, false));
159      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Multi"),
160        MultiRequest.getDefaultInstance(), MetricsConnection.newCallStats(),
161        new CallTimeoutException("test with CallTimeoutException"));
162      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
163        MutateRequest.newBuilder()
164          .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo)))
165          .setRegion(region).build(),
166        MetricsConnection.newCallStats(), null);
167      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
168        MutateRequest.newBuilder()
169          .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo)))
170          .setRegion(region).build(),
171        MetricsConnection.newCallStats(), null);
172      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
173        MutateRequest.newBuilder()
174          .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo)))
175          .setRegion(region).build(),
176        MetricsConnection.newCallStats(), null);
177      METRICS.updateRpc(ClientService.getDescriptor().findMethodByName("Mutate"),
178        MutateRequest.newBuilder()
179          .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo))).setRegion(region)
180          .build(),
181        MetricsConnection.newCallStats(), null);
182    }
183
184    final String rpcCountPrefix = "rpcCount_" + ClientService.getDescriptor().getName() + "_";
185    final String rpcFailureCountPrefix =
186      "rpcFailureCount_" + ClientService.getDescriptor().getName() + "_";
187    String metricKey;
188    long metricVal;
189    Counter counter;
190
191    for (String method : new String[] { "Get", "Scan", "Multi", "Mutate" }) {
192      metricKey = rpcCountPrefix + method;
193      metricVal = METRICS.getRpcCounters().get(metricKey).getCount();
194      assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal >= loop);
195
196      metricKey = rpcFailureCountPrefix + method;
197      counter = METRICS.getRpcCounters().get(metricKey);
198      metricVal = (counter != null) ? counter.getCount() : 0;
199      if (method.equals("Get") || method.equals("Mutate")) {
200        // no failure
201        assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal == 0);
202      } else {
203        // has failure
204        assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal == loop);
205      }
206    }
207
208    // remote exception
209    metricKey = "rpcRemoteExceptions_IOException";
210    counter = METRICS.getRpcCounters().get(metricKey);
211    metricVal = (counter != null) ? counter.getCount() : 0;
212    assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal == loop);
213
214    // local exception
215    metricKey = "rpcLocalExceptions_CallTimeoutException";
216    counter = METRICS.getRpcCounters().get(metricKey);
217    metricVal = (counter != null) ? counter.getCount() : 0;
218    assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal == loop);
219
220    // total exception
221    metricKey = "rpcTotalExceptions";
222    counter = METRICS.getRpcCounters().get(metricKey);
223    metricVal = (counter != null) ? counter.getCount() : 0;
224    assertTrue("metric: " + metricKey + " val: " + metricVal, metricVal == loop * 2);
225
226    for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
227      METRICS.getGetTracker(), METRICS.getScanTracker(), METRICS.getMultiTracker(),
228      METRICS.getAppendTracker(), METRICS.getDeleteTracker(), METRICS.getIncrementTracker(),
229      METRICS.getPutTracker() }) {
230      assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.getCount());
231      assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.getCount());
232      assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.getCount());
233    }
234    RatioGauge executorMetrics =
235      (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getExecutorPoolName());
236    RatioGauge metaMetrics =
237      (RatioGauge) METRICS.getMetricRegistry().getMetrics().get(METRICS.getMetaPoolName());
238    assertEquals(Ratio.of(0, 3).getValue(), executorMetrics.getValue(), 0);
239    assertEquals(Double.NaN, metaMetrics.getValue(), 0);
240  }
241}