001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import com.google.protobuf.RpcCallback;
027import com.google.protobuf.RpcController;
028import com.google.protobuf.ServiceException;
029import java.io.IOException;
030import java.util.List;
031import java.util.Optional;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CoprocessorEnvironment;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HColumnDescriptor;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.HTableDescriptor;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Mutation;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionLocator;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
052import org.apache.hadoop.hbase.metrics.Counter;
053import org.apache.hadoop.hbase.metrics.Metric;
054import org.apache.hadoop.hbase.metrics.MetricRegistries;
055import org.apache.hadoop.hbase.metrics.MetricRegistry;
056import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
057import org.apache.hadoop.hbase.metrics.Timer;
058import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
059import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
060import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
061import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
062import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
063import org.apache.hadoop.hbase.regionserver.HRegionServer;
064import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
065import org.apache.hadoop.hbase.testclassification.MediumTests;
066import org.apache.hadoop.hbase.util.Bytes;
067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
068import org.apache.hadoop.hbase.wal.WALEdit;
069import org.apache.hadoop.hbase.wal.WALKey;
070import org.junit.AfterClass;
071import org.junit.Before;
072import org.junit.BeforeClass;
073import org.junit.ClassRule;
074import org.junit.Rule;
075import org.junit.Test;
076import org.junit.experimental.categories.Category;
077import org.junit.rules.TestName;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
082
083/**
084 * Testing of coprocessor metrics end-to-end.
085 */
086@Category({ CoprocessorTests.class, MediumTests.class })
087public class TestCoprocessorMetrics {
088
089  @ClassRule
090  public static final HBaseClassTestRule CLASS_RULE =
091    HBaseClassTestRule.forClass(TestCoprocessorMetrics.class);
092
093  private static final Logger LOG = LoggerFactory.getLogger(TestCoprocessorMetrics.class);
094  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
095
096  private static final byte[] foo = Bytes.toBytes("foo");
097  private static final byte[] bar = Bytes.toBytes("bar");
098
099  @Rule
100  public TestName name = new TestName();
101
102  /**
103   * MasterObserver that has a Timer metric for create table operation.
104   */
105  public static class CustomMasterObserver implements MasterCoprocessor, MasterObserver {
106    private Timer createTableTimer;
107    private long start = Long.MIN_VALUE;
108
109    @Override
110    public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
111      TableDescriptor desc, RegionInfo[] regions) throws IOException {
112      // we rely on the fact that there is only 1 instance of our MasterObserver
113      this.start = EnvironmentEdgeManager.currentTime();
114    }
115
116    @Override
117    public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
118      TableDescriptor desc, RegionInfo[] regions) throws IOException {
119      if (this.start > 0) {
120        long time = EnvironmentEdgeManager.currentTime() - start;
121        LOG.info("Create table took: " + time);
122        createTableTimer.updateMillis(time);
123      }
124    }
125
126    @Override
127    public void start(CoprocessorEnvironment env) throws IOException {
128      if (env instanceof MasterCoprocessorEnvironment) {
129        MetricRegistry registry = ((MasterCoprocessorEnvironment) env).getMetricRegistryForMaster();
130
131        createTableTimer = registry.timer("CreateTable");
132      }
133    }
134
135    @Override
136    public Optional<MasterObserver> getMasterObserver() {
137      return Optional.of(this);
138    }
139  }
140
141  /**
142   * RegionServerObserver that has a Counter for rollWAL requests.
143   */
144  public static class CustomRegionServerObserver
145    implements RegionServerCoprocessor, RegionServerObserver {
146    /** This is the Counter metric object to keep track of the current count across invocations */
147    private Counter rollWALCounter;
148
149    @Override
150    public Optional<RegionServerObserver> getRegionServerObserver() {
151      return Optional.of(this);
152    }
153
154    @Override
155    public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
156      throws IOException {
157      // Increment the Counter whenever the coprocessor is called
158      rollWALCounter.increment();
159    }
160
161    @Override
162    public void start(CoprocessorEnvironment env) throws IOException {
163      if (env instanceof RegionServerCoprocessorEnvironment) {
164        MetricRegistry registry =
165          ((RegionServerCoprocessorEnvironment) env).getMetricRegistryForRegionServer();
166
167        if (rollWALCounter == null) {
168          rollWALCounter = registry.counter("rollWALRequests");
169        }
170      }
171    }
172  }
173
174  /**
175   * WALObserver that has a Counter for walEdits written.
176   */
177  public static class CustomWALObserver implements WALCoprocessor, WALObserver {
178    private Counter walEditsCount;
179
180    @Override
181    public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
182      RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
183      walEditsCount.increment();
184    }
185
186    @Override
187    public void start(CoprocessorEnvironment env) throws IOException {
188      if (env instanceof WALCoprocessorEnvironment) {
189        MetricRegistry registry =
190          ((WALCoprocessorEnvironment) env).getMetricRegistryForRegionServer();
191
192        if (walEditsCount == null) {
193          walEditsCount = registry.counter("walEditsCount");
194        }
195      }
196    }
197
198    @Override
199    public Optional<WALObserver> getWALObserver() {
200      return Optional.of(this);
201    }
202  }
203
204  /**
205   * RegionObserver that has a Counter for preGet()
206   */
207  public static class CustomRegionObserver implements RegionCoprocessor, RegionObserver {
208    private Counter preGetCounter;
209
210    @Override
211    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
212      List<Cell> results) throws IOException {
213      preGetCounter.increment();
214    }
215
216    @Override
217    public Optional<RegionObserver> getRegionObserver() {
218      return Optional.of(this);
219    }
220
221    @Override
222    public void start(CoprocessorEnvironment env) throws IOException {
223      if (env instanceof RegionCoprocessorEnvironment) {
224        MetricRegistry registry =
225          ((RegionCoprocessorEnvironment) env).getMetricRegistryForRegionServer();
226
227        if (preGetCounter == null) {
228          preGetCounter = registry.counter("preGetRequests");
229        }
230      }
231    }
232  }
233
234  public static class CustomRegionObserver2 extends CustomRegionObserver {
235  }
236
237  /**
238   * RegionEndpoint to test metrics from endpoint calls
239   */
240  public static class CustomRegionEndpoint extends MultiRowMutationEndpoint {
241
242    private Timer endpointExecution;
243
244    @Override
245    public void mutateRows(RpcController controller, MutateRowsRequest request,
246      RpcCallback<MutateRowsResponse> done) {
247      long start = System.nanoTime();
248      super.mutateRows(controller, request, done);
249      endpointExecution.updateNanos(System.nanoTime() - start);
250    }
251
252    @Override
253    public void start(CoprocessorEnvironment env) throws IOException {
254      super.start(env);
255
256      if (env instanceof RegionCoprocessorEnvironment) {
257        MetricRegistry registry =
258          ((RegionCoprocessorEnvironment) env).getMetricRegistryForRegionServer();
259
260        if (endpointExecution == null) {
261          endpointExecution = registry.timer("EndpointExecution");
262        }
263      }
264    }
265  }
266
267  @BeforeClass
268  public static void setupBeforeClass() throws Exception {
269    Configuration conf = UTIL.getConfiguration();
270    // inject master, regionserver and WAL coprocessors
271    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, CustomMasterObserver.class.getName());
272    conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
273      CustomRegionServerObserver.class.getName());
274    conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, CustomWALObserver.class.getName());
275    conf.setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, true);
276    UTIL.startMiniCluster();
277  }
278
279  @AfterClass
280  public static void teardownAfterClass() throws Exception {
281    UTIL.shutdownMiniCluster();
282  }
283
284  @Before
285  public void setup() throws IOException {
286    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
287      Admin admin = connection.getAdmin()) {
288      for (HTableDescriptor htd : admin.listTables()) {
289        UTIL.deleteTable(htd.getTableName());
290      }
291    }
292  }
293
294  @Test
295  public void testMasterObserver() throws IOException {
296    // Find out the MetricRegistry used by the CP using the global registries
297    MetricRegistryInfo info = MetricsCoprocessor
298      .createRegistryInfoForMasterCoprocessor(CustomMasterObserver.class.getName());
299    Optional<MetricRegistry> registry = MetricRegistries.global().get(info);
300    assertTrue(registry.isPresent());
301
302    Optional<Metric> metric = registry.get().get("CreateTable");
303    assertTrue(metric.isPresent());
304
305    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
306      Admin admin = connection.getAdmin()) {
307
308      Timer createTableTimer = (Timer) metric.get();
309      long prevCount = createTableTimer.getHistogram().getCount();
310      LOG.info("Creating table");
311      admin.createTable(new HTableDescriptor(TableName.valueOf(name.getMethodName()))
312        .addFamily(new HColumnDescriptor("foo")));
313
314      assertEquals(1, createTableTimer.getHistogram().getCount() - prevCount);
315    }
316  }
317
318  @Test
319  public void testRegionServerObserver() throws IOException {
320    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
321      Admin admin = connection.getAdmin()) {
322      LOG.info("Rolling WALs");
323      admin.rollWALWriter(UTIL.getMiniHBaseCluster().getServerHoldingMeta());
324    }
325
326    // Find out the MetricRegistry used by the CP using the global registries
327    MetricRegistryInfo info = MetricsCoprocessor
328      .createRegistryInfoForRSCoprocessor(CustomRegionServerObserver.class.getName());
329
330    Optional<MetricRegistry> registry = MetricRegistries.global().get(info);
331    assertTrue(registry.isPresent());
332
333    Optional<Metric> metric = registry.get().get("rollWALRequests");
334    assertTrue(metric.isPresent());
335
336    Counter rollWalRequests = (Counter) metric.get();
337    assertEquals(1, rollWalRequests.getCount());
338  }
339
340  @Test
341  public void testWALObserver() throws IOException {
342    // Find out the MetricRegistry used by the CP using the global registries
343    MetricRegistryInfo info =
344      MetricsCoprocessor.createRegistryInfoForWALCoprocessor(CustomWALObserver.class.getName());
345
346    Optional<MetricRegistry> registry = MetricRegistries.global().get(info);
347    assertTrue(registry.isPresent());
348
349    Optional<Metric> metric = registry.get().get("walEditsCount");
350    assertTrue(metric.isPresent());
351
352    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
353      Admin admin = connection.getAdmin()) {
354      admin.createTable(new HTableDescriptor(TableName.valueOf(name.getMethodName()))
355        .addFamily(new HColumnDescriptor("foo")));
356
357      Counter rollWalRequests = (Counter) metric.get();
358      long prevCount = rollWalRequests.getCount();
359      assertTrue(prevCount > 0);
360
361      try (Table table = connection.getTable(TableName.valueOf(name.getMethodName()))) {
362        table.put(new Put(foo).addColumn(foo, foo, foo));
363      }
364
365      assertEquals(1, rollWalRequests.getCount() - prevCount);
366    }
367  }
368
369  /**
370   * Helper for below tests
371   */
372  private void assertPreGetRequestsCounter(Class<?> coprocClass) {
373    // Find out the MetricRegistry used by the CP using the global registries
374    MetricRegistryInfo info =
375      MetricsCoprocessor.createRegistryInfoForRegionCoprocessor(coprocClass.getName());
376
377    Optional<MetricRegistry> registry = MetricRegistries.global().get(info);
378    assertTrue(registry.isPresent());
379
380    Optional<Metric> metric = registry.get().get("preGetRequests");
381    assertTrue(metric.isPresent());
382
383    Counter preGetRequests = (Counter) metric.get();
384    assertEquals(2, preGetRequests.getCount());
385  }
386
387  @Test
388  public void testRegionObserverSingleRegion() throws IOException {
389    final TableName tableName = TableName.valueOf(name.getMethodName());
390    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
391      Admin admin = connection.getAdmin()) {
392      admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(foo))
393        // add the coprocessor for the region
394        .addCoprocessor(CustomRegionObserver.class.getName()));
395      try (Table table = connection.getTable(tableName)) {
396        table.get(new Get(foo));
397        table.get(new Get(foo)); // 2 gets
398      }
399    }
400
401    assertPreGetRequestsCounter(CustomRegionObserver.class);
402  }
403
404  @Test
405  public void testRegionObserverMultiRegion() throws IOException {
406    final TableName tableName = TableName.valueOf(name.getMethodName());
407    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
408      Admin admin = connection.getAdmin()) {
409      admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(foo))
410        // add the coprocessor for the region
411        .addCoprocessor(CustomRegionObserver.class.getName()), new byte[][] { foo }); // create with
412                                                                                      // 2 regions
413      try (Table table = connection.getTable(tableName);
414        RegionLocator locator = connection.getRegionLocator(tableName)) {
415        table.get(new Get(bar));
416        table.get(new Get(foo)); // 2 gets to 2 separate regions
417        assertEquals(2, locator.getAllRegionLocations().size());
418        assertNotEquals(locator.getRegionLocation(bar).getRegionInfo(),
419          locator.getRegionLocation(foo).getRegionInfo());
420      }
421    }
422
423    assertPreGetRequestsCounter(CustomRegionObserver.class);
424  }
425
426  @Test
427  public void testRegionObserverMultiTable() throws IOException {
428    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
429    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
430    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
431      Admin admin = connection.getAdmin()) {
432      admin.createTable(new HTableDescriptor(tableName1).addFamily(new HColumnDescriptor(foo))
433        // add the coprocessor for the region
434        .addCoprocessor(CustomRegionObserver.class.getName()));
435      admin.createTable(new HTableDescriptor(tableName2).addFamily(new HColumnDescriptor(foo))
436        // add the coprocessor for the region
437        .addCoprocessor(CustomRegionObserver.class.getName()));
438      try (Table table1 = connection.getTable(tableName1);
439        Table table2 = connection.getTable(tableName2)) {
440        table1.get(new Get(bar));
441        table2.get(new Get(foo)); // 2 gets to 2 separate tables
442      }
443    }
444    assertPreGetRequestsCounter(CustomRegionObserver.class);
445  }
446
447  @Test
448  public void testRegionObserverMultiCoprocessor() throws IOException {
449    final TableName tableName = TableName.valueOf(name.getMethodName());
450    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
451      Admin admin = connection.getAdmin()) {
452      admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(foo))
453        // add the coprocessor for the region. We add two different coprocessors
454        .addCoprocessor(CustomRegionObserver.class.getName())
455        .addCoprocessor(CustomRegionObserver2.class.getName()));
456      try (Table table = connection.getTable(tableName)) {
457        table.get(new Get(foo));
458        table.get(new Get(foo)); // 2 gets
459      }
460    }
461
462    // we will have two counters coming from two coprocs, in two different MetricRegistries
463    assertPreGetRequestsCounter(CustomRegionObserver.class);
464    assertPreGetRequestsCounter(CustomRegionObserver2.class);
465  }
466
467  @Test
468  public void testRegionObserverAfterRegionClosed() throws IOException {
469    final TableName tableName = TableName.valueOf(name.getMethodName());
470    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
471      Admin admin = connection.getAdmin()) {
472      admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(foo))
473        // add the coprocessor for the region
474        .addCoprocessor(CustomRegionObserver.class.getName()), new byte[][] { foo }); // create with
475                                                                                      // 2 regions
476      try (Table table = connection.getTable(tableName)) {
477        table.get(new Get(foo));
478        table.get(new Get(foo)); // 2 gets
479      }
480
481      assertPreGetRequestsCounter(CustomRegionObserver.class);
482
483      // close one of the regions
484      try (RegionLocator locator = connection.getRegionLocator(tableName)) {
485        HRegionLocation loc = locator.getRegionLocation(foo);
486        admin.unassign(loc.getRegionInfo().getEncodedNameAsBytes(), true);
487
488        HRegionServer server = UTIL.getMiniHBaseCluster().getRegionServer(loc.getServerName());
489        UTIL.waitFor(30000,
490          () -> server.getOnlineRegion(loc.getRegionInfo().getRegionName()) == null);
491        assertNull(server.getOnlineRegion(loc.getRegionInfo().getRegionName()));
492      }
493
494      // with only 1 region remaining, we should still be able to find the Counter
495      assertPreGetRequestsCounter(CustomRegionObserver.class);
496
497      // close the table
498      admin.disableTable(tableName);
499
500      MetricRegistryInfo info = MetricsCoprocessor
501        .createRegistryInfoForRegionCoprocessor(CustomRegionObserver.class.getName());
502
503      // ensure that MetricRegistry is deleted
504      Optional<MetricRegistry> registry = MetricRegistries.global().get(info);
505      assertFalse(registry.isPresent());
506    }
507  }
508
509  @Test
510  public void testRegionObserverEndpoint() throws IOException, ServiceException {
511    final TableName tableName = TableName.valueOf(name.getMethodName());
512    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
513      Admin admin = connection.getAdmin()) {
514      admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(foo))
515        // add the coprocessor for the region
516        .addCoprocessor(CustomRegionEndpoint.class.getName()));
517
518      try (Table table = connection.getTable(tableName)) {
519        List<Mutation> mutations = Lists.newArrayList(new Put(foo), new Put(bar));
520        MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
521
522        for (Mutation mutation : mutations) {
523          mrmBuilder.addMutationRequest(
524            ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, mutation));
525        }
526
527        CoprocessorRpcChannel channel = table.coprocessorService(bar);
528        MultiRowMutationService.BlockingInterface service =
529          MultiRowMutationService.newBlockingStub(channel);
530        MutateRowsRequest mrm = mrmBuilder.build();
531        service.mutateRows(null, mrm);
532      }
533    }
534
535    // Find out the MetricRegistry used by the CP using the global registries
536    MetricRegistryInfo info = MetricsCoprocessor
537      .createRegistryInfoForRegionCoprocessor(CustomRegionEndpoint.class.getName());
538
539    Optional<MetricRegistry> registry = MetricRegistries.global().get(info);
540    assertTrue(registry.isPresent());
541
542    Optional<Metric> metric = registry.get().get("EndpointExecution");
543    assertTrue(metric.isPresent());
544
545    Timer endpointExecutions = (Timer) metric.get();
546    assertEquals(1, endpointExecutions.getHistogram().getCount());
547  }
548}