001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertNotEquals;
023import static org.junit.jupiter.api.Assertions.assertNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.IOException;
027import java.util.List;
028import java.util.Optional;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Cell;
031import org.apache.hadoop.hbase.CoprocessorEnvironment;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.ConnectionFactory;
040import org.apache.hadoop.hbase.client.Get;
041import org.apache.hadoop.hbase.client.Mutation;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.RegionLocator;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
049import org.apache.hadoop.hbase.metrics.Counter;
050import org.apache.hadoop.hbase.metrics.Metric;
051import org.apache.hadoop.hbase.metrics.MetricRegistries;
052import org.apache.hadoop.hbase.metrics.MetricRegistry;
053import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
054import org.apache.hadoop.hbase.metrics.Timer;
055import org.apache.hadoop.hbase.regionserver.HRegionServer;
056import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.wal.WALEdit;
061import org.apache.hadoop.hbase.wal.WALKey;
062import org.junit.jupiter.api.AfterAll;
063import org.junit.jupiter.api.BeforeAll;
064import org.junit.jupiter.api.BeforeEach;
065import org.junit.jupiter.api.Tag;
066import org.junit.jupiter.api.Test;
067import org.junit.jupiter.api.TestInfo;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
072import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
073import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
074import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
075
076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
081
082/**
083 * Testing of coprocessor metrics end-to-end.
084 */
085@Tag(CoprocessorTests.TAG)
086@Tag(MediumTests.TAG)
087public class TestCoprocessorMetrics {
088
089  private static final Logger LOG = LoggerFactory.getLogger(TestCoprocessorMetrics.class);
090  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
091
092  private static final byte[] foo = Bytes.toBytes("foo");
093  private static final byte[] bar = Bytes.toBytes("bar");
094
095  private String currentTestName;
096
097  /**
098   * MasterObserver that has a Timer metric for create table operation.
099   */
100  public static class CustomMasterObserver implements MasterCoprocessor, MasterObserver {
101    private Timer createTableTimer;
102    private long start = Long.MIN_VALUE;
103
104    @Override
105    public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
106      TableDescriptor desc, RegionInfo[] regions) throws IOException {
107      // we rely on the fact that there is only 1 instance of our MasterObserver
108      this.start = EnvironmentEdgeManager.currentTime();
109    }
110
111    @Override
112    public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
113      TableDescriptor desc, RegionInfo[] regions) throws IOException {
114      if (this.start > 0) {
115        long time = EnvironmentEdgeManager.currentTime() - start;
116        LOG.info("Create table took: " + time);
117        createTableTimer.updateMillis(time);
118      }
119    }
120
121    @Override
122    public void start(CoprocessorEnvironment env) throws IOException {
123      if (env instanceof MasterCoprocessorEnvironment) {
124        MetricRegistry registry = ((MasterCoprocessorEnvironment) env).getMetricRegistryForMaster();
125
126        createTableTimer = registry.timer("CreateTable");
127      }
128    }
129
130    @Override
131    public Optional<MasterObserver> getMasterObserver() {
132      return Optional.of(this);
133    }
134  }
135
136  /**
137   * RegionServerObserver that has a Counter for rollWAL requests.
138   */
139  public static class CustomRegionServerObserver
140    implements RegionServerCoprocessor, RegionServerObserver {
141    /** This is the Counter metric object to keep track of the current count across invocations */
142    private Counter rollWALCounter;
143
144    @Override
145    public Optional<RegionServerObserver> getRegionServerObserver() {
146      return Optional.of(this);
147    }
148
149    @Override
150    public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
151      throws IOException {
152      // Increment the Counter whenever the coprocessor is called
153      rollWALCounter.increment();
154    }
155
156    @Override
157    public void start(CoprocessorEnvironment env) throws IOException {
158      if (env instanceof RegionServerCoprocessorEnvironment) {
159        MetricRegistry registry =
160          ((RegionServerCoprocessorEnvironment) env).getMetricRegistryForRegionServer();
161
162        if (rollWALCounter == null) {
163          rollWALCounter = registry.counter("rollWALRequests");
164        }
165      }
166    }
167  }
168
169  /**
170   * WALObserver that has a Counter for walEdits written.
171   */
172  public static class CustomWALObserver implements WALCoprocessor, WALObserver {
173    private Counter walEditsCount;
174
175    @Override
176    public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
177      RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
178      walEditsCount.increment();
179    }
180
181    @Override
182    public void start(CoprocessorEnvironment env) throws IOException {
183      if (env instanceof WALCoprocessorEnvironment) {
184        MetricRegistry registry =
185          ((WALCoprocessorEnvironment) env).getMetricRegistryForRegionServer();
186
187        if (walEditsCount == null) {
188          walEditsCount = registry.counter("walEditsCount");
189        }
190      }
191    }
192
193    @Override
194    public Optional<WALObserver> getWALObserver() {
195      return Optional.of(this);
196    }
197  }
198
199  /**
200   * RegionObserver that has a Counter for preGet()
201   */
202  public static class CustomRegionObserver implements RegionCoprocessor, RegionObserver {
203    private Counter preGetCounter;
204
205    @Override
206    public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> e, Get get,
207      List<Cell> results) throws IOException {
208      preGetCounter.increment();
209    }
210
211    @Override
212    public Optional<RegionObserver> getRegionObserver() {
213      return Optional.of(this);
214    }
215
216    @Override
217    public void start(CoprocessorEnvironment env) throws IOException {
218      if (env instanceof RegionCoprocessorEnvironment) {
219        MetricRegistry registry =
220          ((RegionCoprocessorEnvironment) env).getMetricRegistryForRegionServer();
221
222        if (preGetCounter == null) {
223          preGetCounter = registry.counter("preGetRequests");
224        }
225      }
226    }
227  }
228
229  public static class CustomRegionObserver2 extends CustomRegionObserver {
230  }
231
232  /**
233   * RegionEndpoint to test metrics from endpoint calls
234   */
235  public static class CustomRegionEndpoint extends MultiRowMutationEndpoint {
236
237    private Timer endpointExecution;
238
239    @Override
240    public void mutateRows(RpcController controller, MutateRowsRequest request,
241      RpcCallback<MutateRowsResponse> done) {
242      long start = System.nanoTime();
243      super.mutateRows(controller, request, done);
244      endpointExecution.updateNanos(System.nanoTime() - start);
245    }
246
247    @Override
248    public void start(CoprocessorEnvironment env) throws IOException {
249      super.start(env);
250
251      if (env instanceof RegionCoprocessorEnvironment) {
252        MetricRegistry registry =
253          ((RegionCoprocessorEnvironment) env).getMetricRegistryForRegionServer();
254
255        if (endpointExecution == null) {
256          endpointExecution = registry.timer("EndpointExecution");
257        }
258      }
259    }
260  }
261
262  @BeforeAll
263  public static void setupBeforeClass() throws Exception {
264    Configuration conf = UTIL.getConfiguration();
265    // inject master, regionserver and WAL coprocessors
266    conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, CustomMasterObserver.class.getName());
267    conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
268      CustomRegionServerObserver.class.getName());
269    conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, CustomWALObserver.class.getName());
270    conf.setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, true);
271    UTIL.startMiniCluster();
272  }
273
274  @AfterAll
275  public static void teardownAfterClass() throws Exception {
276    UTIL.shutdownMiniCluster();
277  }
278
279  @BeforeEach
280  public void setup(TestInfo testInfo) throws IOException {
281    currentTestName = testInfo.getTestMethod().get().getName();
282    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
283      Admin admin = connection.getAdmin()) {
284      for (TableDescriptor htd : admin.listTableDescriptors()) {
285        UTIL.deleteTable(htd.getTableName());
286      }
287    }
288  }
289
290  @Test
291  public void testMasterObserver() throws IOException {
292    // Find out the MetricRegistry used by the CP using the global registries
293    MetricRegistryInfo info = MetricsCoprocessor
294      .createRegistryInfoForMasterCoprocessor(CustomMasterObserver.class.getName());
295    Optional<MetricRegistry> registry = MetricRegistries.global().get(info);
296    assertTrue(registry.isPresent());
297
298    Optional<Metric> metric = registry.get().get("CreateTable");
299    assertTrue(metric.isPresent());
300
301    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
302      Admin admin = connection.getAdmin()) {
303
304      Timer createTableTimer = (Timer) metric.get();
305      long prevCount = createTableTimer.getHistogram().getCount();
306      LOG.info("Creating table");
307      TableDescriptorBuilder tableDescriptorBuilder =
308        TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName));
309      ColumnFamilyDescriptor columnFamilyDescriptor =
310        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("foo")).build();
311      tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
312      admin.createTable(tableDescriptorBuilder.build());
313      assertEquals(1, createTableTimer.getHistogram().getCount() - prevCount);
314    }
315  }
316
317  @Test
318  public void testRegionServerObserver() throws IOException {
319    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
320      Admin admin = connection.getAdmin()) {
321      LOG.info("Rolling WALs");
322      admin.rollWALWriter(UTIL.getMiniHBaseCluster().getServerHoldingMeta());
323    }
324
325    // Find out the MetricRegistry used by the CP using the global registries
326    MetricRegistryInfo info = MetricsCoprocessor
327      .createRegistryInfoForRSCoprocessor(CustomRegionServerObserver.class.getName());
328
329    Optional<MetricRegistry> registry = MetricRegistries.global().get(info);
330    assertTrue(registry.isPresent());
331
332    Optional<Metric> metric = registry.get().get("rollWALRequests");
333    assertTrue(metric.isPresent());
334
335    Counter rollWalRequests = (Counter) metric.get();
336    assertEquals(1, rollWalRequests.getCount());
337  }
338
339  @Test
340  public void testWALObserver() throws IOException {
341    // Find out the MetricRegistry used by the CP using the global registries
342    MetricRegistryInfo info =
343      MetricsCoprocessor.createRegistryInfoForWALCoprocessor(CustomWALObserver.class.getName());
344
345    Optional<MetricRegistry> registry = MetricRegistries.global().get(info);
346    assertTrue(registry.isPresent());
347
348    Optional<Metric> metric = registry.get().get("walEditsCount");
349    assertTrue(metric.isPresent());
350
351    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
352      Admin admin = connection.getAdmin()) {
353      TableDescriptorBuilder tableDescriptorBuilder =
354        TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTestName));
355      ColumnFamilyDescriptor columnFamilyDescriptor =
356        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("foo")).build();
357      tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
358      admin.createTable(tableDescriptorBuilder.build());
359
360      Counter rollWalRequests = (Counter) metric.get();
361      long prevCount = rollWalRequests.getCount();
362      assertTrue(prevCount > 0);
363
364      try (Table table = connection.getTable(TableName.valueOf(currentTestName))) {
365        table.put(new Put(foo).addColumn(foo, foo, foo));
366      }
367
368      assertEquals(1, rollWalRequests.getCount() - prevCount);
369    }
370  }
371
372  /**
373   * Helper for below tests
374   */
375  private void assertPreGetRequestsCounter(Class<?> coprocClass) {
376    // Find out the MetricRegistry used by the CP using the global registries
377    MetricRegistryInfo info =
378      MetricsCoprocessor.createRegistryInfoForRegionCoprocessor(coprocClass.getName());
379
380    Optional<MetricRegistry> registry = MetricRegistries.global().get(info);
381    assertTrue(registry.isPresent());
382
383    Optional<Metric> metric = registry.get().get("preGetRequests");
384    assertTrue(metric.isPresent());
385
386    Counter preGetRequests = (Counter) metric.get();
387    assertEquals(2, preGetRequests.getCount());
388  }
389
390  @Test
391  public void testRegionObserverSingleRegion() throws IOException {
392    final TableName tableName = TableName.valueOf(currentTestName);
393    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
394      Admin admin = connection.getAdmin()) {
395      admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
396        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(foo))
397        // add the coprocessor for the region
398        .setCoprocessor(CustomRegionObserver.class.getName()).build());
399      try (Table table = connection.getTable(tableName)) {
400        table.get(new Get(foo));
401        table.get(new Get(foo)); // 2 gets
402      }
403    }
404
405    assertPreGetRequestsCounter(CustomRegionObserver.class);
406  }
407
408  @Test
409  public void testRegionObserverMultiRegion() throws IOException {
410    final TableName tableName = TableName.valueOf(currentTestName);
411    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
412      Admin admin = connection.getAdmin()) {
413      admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
414        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(foo))
415        // add the coprocessor for the region
416        .setCoprocessor(CustomRegionObserver.class.getName()).build(), new byte[][] { foo });
417      // create with 2 regions
418      try (Table table = connection.getTable(tableName);
419        RegionLocator locator = connection.getRegionLocator(tableName)) {
420        table.get(new Get(bar));
421        table.get(new Get(foo)); // 2 gets to 2 separate regions
422        assertEquals(2, locator.getAllRegionLocations().size());
423        assertNotEquals(locator.getRegionLocation(bar).getRegion(),
424          locator.getRegionLocation(foo).getRegion());
425      }
426    }
427
428    assertPreGetRequestsCounter(CustomRegionObserver.class);
429  }
430
431  @Test
432  public void testRegionObserverMultiTable() throws IOException {
433    final TableName tableName1 = TableName.valueOf(currentTestName + "1");
434    final TableName tableName2 = TableName.valueOf(currentTestName + "2");
435    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
436      Admin admin = connection.getAdmin()) {
437      admin.createTable(TableDescriptorBuilder.newBuilder(tableName1)
438        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(foo))
439        // add the coprocessor for the region
440        .setCoprocessor(CustomRegionObserver.class.getName()).build());
441      admin.createTable(TableDescriptorBuilder.newBuilder(tableName2)
442        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(foo))
443        // add the coprocessor for the region
444        .setCoprocessor(CustomRegionObserver.class.getName()).build());
445      try (Table table1 = connection.getTable(tableName1);
446        Table table2 = connection.getTable(tableName2)) {
447        table1.get(new Get(bar));
448        table2.get(new Get(foo)); // 2 gets to 2 separate tables
449      }
450    }
451    assertPreGetRequestsCounter(CustomRegionObserver.class);
452  }
453
454  @Test
455  public void testRegionObserverMultiCoprocessor() throws IOException {
456    final TableName tableName = TableName.valueOf(currentTestName);
457    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
458      Admin admin = connection.getAdmin()) {
459      admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
460        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(foo))
461        // add the coprocessor for the region. We add two different coprocessors
462        .setCoprocessor(CustomRegionObserver.class.getName())
463        .setCoprocessor(CustomRegionObserver2.class.getName()).build());
464      try (Table table = connection.getTable(tableName)) {
465        table.get(new Get(foo));
466        table.get(new Get(foo)); // 2 gets
467      }
468    }
469
470    // we will have two counters coming from two coprocs, in two different MetricRegistries
471    assertPreGetRequestsCounter(CustomRegionObserver.class);
472    assertPreGetRequestsCounter(CustomRegionObserver2.class);
473  }
474
475  @Test
476  public void testRegionObserverAfterRegionClosed() throws IOException {
477    final TableName tableName = TableName.valueOf(currentTestName);
478    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
479      Admin admin = connection.getAdmin()) {
480      admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
481        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(foo))
482        // add the coprocessor for the region
483        .setCoprocessor(CustomRegionObserver.class.getName()).build(), new byte[][] { foo });
484      // create with 2 regions
485      try (Table table = connection.getTable(tableName)) {
486        table.get(new Get(foo));
487        table.get(new Get(foo)); // 2 gets
488      }
489
490      assertPreGetRequestsCounter(CustomRegionObserver.class);
491
492      // close one of the regions
493      try (RegionLocator locator = connection.getRegionLocator(tableName)) {
494        HRegionLocation loc = locator.getRegionLocation(foo);
495        admin.unassign(loc.getRegion().getEncodedNameAsBytes(), true);
496
497        HRegionServer server = UTIL.getMiniHBaseCluster().getRegionServer(loc.getServerName());
498        UTIL.waitFor(30000, () -> server.getOnlineRegion(loc.getRegion().getRegionName()) == null);
499        assertNull(server.getOnlineRegion(loc.getRegion().getRegionName()));
500      }
501
502      // with only 1 region remaining, we should still be able to find the Counter
503      assertPreGetRequestsCounter(CustomRegionObserver.class);
504
505      // close the table
506      admin.disableTable(tableName);
507
508      MetricRegistryInfo info = MetricsCoprocessor
509        .createRegistryInfoForRegionCoprocessor(CustomRegionObserver.class.getName());
510
511      // ensure that MetricRegistry is deleted
512      Optional<MetricRegistry> registry = MetricRegistries.global().get(info);
513      assertFalse(registry.isPresent());
514    }
515  }
516
517  @Test
518  public void testRegionObserverEndpoint() throws IOException, ServiceException {
519    final TableName tableName = TableName.valueOf(currentTestName);
520    try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
521      Admin admin = connection.getAdmin()) {
522      admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
523        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(foo))
524        // add the coprocessor for the region
525        .setCoprocessor(CustomRegionEndpoint.class.getName()).build());
526
527      try (Table table = connection.getTable(tableName)) {
528        List<Mutation> mutations = Lists.newArrayList(new Put(foo), new Put(bar));
529        MutateRowsRequest.Builder mrmBuilder = MutateRowsRequest.newBuilder();
530
531        for (Mutation mutation : mutations) {
532          mrmBuilder.addMutationRequest(
533            ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, mutation));
534        }
535
536        CoprocessorRpcChannel channel = table.coprocessorService(bar);
537        MultiRowMutationService.BlockingInterface service =
538          MultiRowMutationService.newBlockingStub(channel);
539        MutateRowsRequest mrm = mrmBuilder.build();
540        service.mutateRows(null, mrm);
541      }
542    }
543
544    // Find out the MetricRegistry used by the CP using the global registries
545    MetricRegistryInfo info = MetricsCoprocessor
546      .createRegistryInfoForRegionCoprocessor(CustomRegionEndpoint.class.getName());
547
548    Optional<MetricRegistry> registry = MetricRegistries.global().get(info);
549    assertTrue(registry.isPresent());
550
551    Optional<Metric> metric = registry.get().get("EndpointExecution");
552    assertTrue(metric.isPresent());
553
554    Timer endpointExecutions = (Timer) metric.get();
555    assertEquals(1, endpointExecutions.getHistogram().getCount());
556  }
557}