001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.fail;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.EnumSet;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.ClusterMetrics.Option;
033import org.apache.hadoop.hbase.CompareOperator;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.RegionMetrics;
036import org.apache.hadoop.hbase.ServerMetrics;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.Append;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Increment;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.ResultScanner;
049import org.apache.hadoop.hbase.client.RowMutations;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.Table;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.coprocessor.ObserverContext;
054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
056import org.apache.hadoop.hbase.coprocessor.RegionObserver;
057import org.apache.hadoop.hbase.filter.BinaryComparator;
058import org.apache.hadoop.hbase.filter.RowFilter;
059import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
060import org.apache.hadoop.hbase.testclassification.MediumTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.junit.jupiter.api.AfterAll;
063import org.junit.jupiter.api.BeforeAll;
064import org.junit.jupiter.api.Disabled;
065import org.junit.jupiter.api.Tag;
066import org.junit.jupiter.api.Test;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070@Disabled // Depends on Master being able to host regions. Needs fixing.
071@Tag(MediumTests.TAG)
072public class TestRegionServerReadRequestMetrics {
073
074  private static final Logger LOG =
075    LoggerFactory.getLogger(TestRegionServerReadRequestMetrics.class);
076  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
077  private static final TableName TABLE_NAME = TableName.valueOf("test");
078  private static final byte[] CF1 = Bytes.toBytes("c1");
079  private static final byte[] CF2 = Bytes.toBytes("c2");
080
081  private static final byte[] ROW1 = Bytes.toBytes("a");
082  private static final byte[] ROW2 = Bytes.toBytes("b");
083  private static final byte[] ROW3 = Bytes.toBytes("c");
084  private static final byte[] COL1 = Bytes.toBytes("q1");
085  private static final byte[] COL2 = Bytes.toBytes("q2");
086  private static final byte[] COL3 = Bytes.toBytes("q3");
087  private static final byte[] VAL1 = Bytes.toBytes("v1");
088  private static final byte[] VAL2 = Bytes.toBytes("v2");
089  private static final byte[] VAL3 = Bytes.toBytes(0L);
090
091  private static final int MAX_TRY = 20;
092  private static final int SLEEP_MS = 100;
093  private static final int TTL = 1;
094
095  private static Admin admin;
096  private static Collection<ServerName> serverNames;
097  private static Table table;
098  private static RegionInfo regionInfo;
099
100  private static Map<Metric, Long> requestsMap = new HashMap<>();
101  private static Map<Metric, Long> requestsMapPrev = new HashMap<>();
102
103  @BeforeAll
104  public static void setUpOnce() throws Exception {
105    TEST_UTIL.startMiniCluster();
106    admin = TEST_UTIL.getAdmin();
107    serverNames =
108      admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().keySet();
109    table = createTable();
110    putData();
111    List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
112    assertEquals(1, regions.size(), "Table " + TABLE_NAME + " should have 1 region");
113    regionInfo = regions.get(0);
114
115    for (Metric metric : Metric.values()) {
116      requestsMap.put(metric, 0L);
117      requestsMapPrev.put(metric, 0L);
118    }
119  }
120
121  private static Table createTable() throws IOException {
122    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
123    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
124    builder
125      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL).build());
126    admin.createTable(builder.build());
127    return TEST_UTIL.getConnection().getTable(TABLE_NAME);
128  }
129
130  private static void testReadRequests(long resultCount, long expectedReadRequests,
131    long expectedFilteredReadRequests) throws IOException, InterruptedException {
132    updateMetricsMap();
133    System.out.println("requestsMapPrev = " + requestsMapPrev);
134    System.out.println("requestsMap = " + requestsMap);
135
136    assertEquals(expectedReadRequests,
137      requestsMap.get(Metric.REGION_READ) - requestsMapPrev.get(Metric.REGION_READ));
138    assertEquals(expectedFilteredReadRequests, requestsMap.get(Metric.FILTERED_REGION_READ)
139      - requestsMapPrev.get(Metric.FILTERED_REGION_READ));
140    assertEquals(expectedFilteredReadRequests, requestsMap.get(Metric.FILTERED_SERVER_READ)
141      - requestsMapPrev.get(Metric.FILTERED_SERVER_READ));
142    assertEquals(expectedReadRequests, resultCount);
143  }
144
145  private static void updateMetricsMap() throws IOException, InterruptedException {
146    for (Metric metric : Metric.values()) {
147      requestsMapPrev.put(metric, requestsMap.get(metric));
148    }
149
150    ServerMetrics serverMetrics = null;
151    RegionMetrics regionMetricsOuter = null;
152    boolean metricsUpdated = false;
153    for (int i = 0; i < MAX_TRY; i++) {
154      for (ServerName serverName : serverNames) {
155        serverMetrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
156          .getLiveServerMetrics().get(serverName);
157
158        Map<byte[], RegionMetrics> regionMetrics = serverMetrics.getRegionMetrics();
159        RegionMetrics regionMetric = regionMetrics.get(regionInfo.getRegionName());
160        if (regionMetric != null) {
161          regionMetricsOuter = regionMetric;
162          for (Metric metric : Metric.values()) {
163            if (getReadRequest(serverMetrics, regionMetric, metric) > requestsMapPrev.get(metric)) {
164              for (Metric metricInner : Metric.values()) {
165                requestsMap.put(metricInner,
166                  getReadRequest(serverMetrics, regionMetric, metricInner));
167              }
168              metricsUpdated = true;
169              break;
170            }
171          }
172        }
173      }
174      if (metricsUpdated) {
175        break;
176      }
177      Thread.sleep(SLEEP_MS);
178    }
179    if (!metricsUpdated) {
180      for (Metric metric : Metric.values()) {
181        requestsMap.put(metric, getReadRequest(serverMetrics, regionMetricsOuter, metric));
182      }
183    }
184  }
185
186  private static long getReadRequest(ServerMetrics serverMetrics, RegionMetrics regionMetrics,
187    Metric metric) {
188    switch (metric) {
189      case REGION_READ:
190        return regionMetrics.getReadRequestCount();
191      case SERVER_READ:
192        return serverMetrics.getRegionMetrics().get(regionMetrics.getRegionName())
193          .getReadRequestCount();
194      case FILTERED_REGION_READ:
195        return regionMetrics.getFilteredReadRequestCount();
196      case FILTERED_SERVER_READ:
197        return serverMetrics.getRegionMetrics().get(regionMetrics.getRegionName())
198          .getFilteredReadRequestCount();
199      default:
200        throw new IllegalStateException();
201    }
202  }
203
204  private static void putData() throws IOException {
205    Put put;
206
207    put = new Put(ROW1);
208    put.addColumn(CF1, COL1, VAL1);
209    put.addColumn(CF1, COL2, VAL2);
210    put.addColumn(CF1, COL3, VAL3);
211    table.put(put);
212    put = new Put(ROW2);
213    put.addColumn(CF1, COL1, VAL2); // put val2 instead of val1
214    put.addColumn(CF1, COL2, VAL2);
215    table.put(put);
216    put = new Put(ROW3);
217    put.addColumn(CF1, COL1, VAL1);
218    put.addColumn(CF1, COL2, VAL2);
219    table.put(put);
220  }
221
222  private static void putTTLExpiredData() throws IOException, InterruptedException {
223    Put put;
224
225    put = new Put(ROW1);
226    put.addColumn(CF2, COL1, VAL1);
227    put.addColumn(CF2, COL2, VAL2);
228    table.put(put);
229
230    Thread.sleep(TTL * 1000);
231
232    put = new Put(ROW2);
233    put.addColumn(CF2, COL1, VAL1);
234    put.addColumn(CF2, COL2, VAL2);
235    table.put(put);
236
237    put = new Put(ROW3);
238    put.addColumn(CF2, COL1, VAL1);
239    put.addColumn(CF2, COL2, VAL2);
240    table.put(put);
241  }
242
243  @AfterAll
244  public static void tearDownOnce() throws Exception {
245    TEST_UTIL.shutdownMiniCluster();
246  }
247
248  @Test
249  public void testReadRequestsCountNotFiltered() throws Exception {
250    int resultCount;
251    Scan scan;
252    Append append;
253    Put put;
254    Increment increment;
255    Get get;
256
257    // test for scan
258    scan = new Scan();
259    try (ResultScanner scanner = table.getScanner(scan)) {
260      resultCount = 0;
261      for (Result ignore : scanner) {
262        resultCount++;
263      }
264      testReadRequests(resultCount, 3, 0);
265    }
266
267    // test for scan
268    scan = new Scan().withStartRow(ROW2).withStopRow(ROW3);
269    try (ResultScanner scanner = table.getScanner(scan)) {
270      resultCount = 0;
271      for (Result ignore : scanner) {
272        resultCount++;
273      }
274      testReadRequests(resultCount, 1, 0);
275    }
276
277    // test for get
278    get = new Get(ROW2);
279    Result result = table.get(get);
280    resultCount = result.isEmpty() ? 0 : 1;
281    testReadRequests(resultCount, 1, 0);
282
283    // test for increment
284    increment = new Increment(ROW1);
285    increment.addColumn(CF1, COL3, 1);
286    result = table.increment(increment);
287    resultCount = result.isEmpty() ? 0 : 1;
288    testReadRequests(resultCount, 1, 0);
289
290    // test for checkAndPut
291    put = new Put(ROW1);
292    put.addColumn(CF1, COL2, VAL2);
293    boolean checkAndPut =
294      table.checkAndMutate(ROW1, CF1).qualifier(COL2).ifEquals(VAL2).thenPut(put);
295    resultCount = checkAndPut ? 1 : 0;
296    testReadRequests(resultCount, 1, 0);
297
298    // test for append
299    append = new Append(ROW1);
300    append.addColumn(CF1, COL2, VAL2);
301    result = table.append(append);
302    resultCount = result.isEmpty() ? 0 : 1;
303    testReadRequests(resultCount, 1, 0);
304
305    // test for checkAndMutate
306    put = new Put(ROW1);
307    put.addColumn(CF1, COL1, VAL1);
308    RowMutations rm = new RowMutations(ROW1);
309    rm.add(put);
310    boolean checkAndMutate =
311      table.checkAndMutate(ROW1, CF1).qualifier(COL1).ifEquals(VAL1).thenMutate(rm);
312    resultCount = checkAndMutate ? 1 : 0;
313    testReadRequests(resultCount, 1, 0);
314  }
315
316  @Disabled // HBASE-19785
317  @Test
318  public void testReadRequestsCountWithFilter() throws Exception {
319    int resultCount;
320    Scan scan;
321
322    // test for scan
323    scan = new Scan();
324    scan.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareOperator.EQUAL, VAL1));
325    try (ResultScanner scanner = table.getScanner(scan)) {
326      resultCount = 0;
327      for (Result ignore : scanner) {
328        resultCount++;
329      }
330      testReadRequests(resultCount, 2, 1);
331    }
332
333    // test for scan
334    scan = new Scan();
335    scan.setFilter(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW1)));
336    try (ResultScanner scanner = table.getScanner(scan)) {
337      resultCount = 0;
338      for (Result ignore : scanner) {
339        resultCount++;
340      }
341      testReadRequests(resultCount, 1, 2);
342    }
343
344    // test for scan
345    scan = new Scan().withStartRow(ROW2).withStopRow(ROW3);
346    scan.setFilter(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW1)));
347    try (ResultScanner scanner = table.getScanner(scan)) {
348      resultCount = 0;
349      for (Result ignore : scanner) {
350        resultCount++;
351      }
352      testReadRequests(resultCount, 0, 1);
353    }
354
355    // fixme filtered get should not increase readRequestsCount
356    // Get get = new Get(ROW2);
357    // get.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1));
358    // Result result = table.get(get);
359    // resultCount = result.isEmpty() ? 0 : 1;
360    // testReadRequests(resultCount, 0, 1);
361  }
362
363  @Disabled // HBASE-19785
364  @Test
365  public void testReadRequestsCountWithDeletedRow() throws Exception {
366    try {
367      Delete delete = new Delete(ROW3);
368      table.delete(delete);
369
370      Scan scan = new Scan();
371      try (ResultScanner scanner = table.getScanner(scan)) {
372        int resultCount = 0;
373        for (Result ignore : scanner) {
374          resultCount++;
375        }
376        testReadRequests(resultCount, 2, 1);
377      }
378    } finally {
379      Put put = new Put(ROW3);
380      put.addColumn(CF1, COL1, VAL1);
381      put.addColumn(CF1, COL2, VAL2);
382      table.put(put);
383    }
384  }
385
386  @Test
387  public void testReadRequestsCountWithTTLExpiration() throws Exception {
388    putTTLExpiredData();
389
390    Scan scan = new Scan();
391    scan.addFamily(CF2);
392    try (ResultScanner scanner = table.getScanner(scan)) {
393      int resultCount = 0;
394      for (Result ignore : scanner) {
395        resultCount++;
396      }
397      testReadRequests(resultCount, 2, 1);
398    }
399  }
400
401  @Disabled // See HBASE-19785
402  @Test
403  public void testReadRequestsWithCoprocessor() throws Exception {
404    TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor");
405    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
406    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
407    builder.setCoprocessor(ScanRegionCoprocessor.class.getName());
408    admin.createTable(builder.build());
409
410    try {
411      TEST_UTIL.waitTableAvailable(tableName);
412      List<RegionInfo> regionInfos = admin.getRegions(tableName);
413      assertEquals(1, regionInfos.size(), "Table " + TABLE_NAME + " should have 1 region");
414      boolean success = true;
415      int i = 0;
416      for (; i < MAX_TRY; i++) {
417        try {
418          testReadRequests(regionInfos.get(0).getRegionName(), 3);
419        } catch (Throwable t) {
420          LOG.warn("Got exception when try " + i + " times", t);
421          Thread.sleep(SLEEP_MS);
422          success = false;
423        }
424        if (success) {
425          break;
426        }
427      }
428      if (i == MAX_TRY) {
429        fail("Failed to get right read requests metric after try " + i + " times");
430      }
431    } finally {
432      admin.disableTable(tableName);
433      admin.deleteTable(tableName);
434    }
435  }
436
437  private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception {
438    for (ServerName serverName : serverNames) {
439      ServerMetrics serverMetrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
440        .getLiveServerMetrics().get(serverName);
441      Map<byte[], RegionMetrics> regionMetrics = serverMetrics.getRegionMetrics();
442      RegionMetrics regionMetric = regionMetrics.get(regionName);
443      if (regionMetric != null) {
444        LOG.debug("server read request is "
445          + serverMetrics.getRegionMetrics().get(regionName).getReadRequestCount()
446          + ", region read request is " + regionMetric.getReadRequestCount());
447        assertEquals(3, serverMetrics.getRegionMetrics().get(regionName).getReadRequestCount());
448        assertEquals(3, regionMetric.getReadRequestCount());
449      }
450    }
451  }
452
453  public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver {
454    @Override
455    public Optional<RegionObserver> getRegionObserver() {
456      return Optional.of(this);
457    }
458
459    @Override
460    public void postOpen(ObserverContext<? extends RegionCoprocessorEnvironment> c) {
461      RegionCoprocessorEnvironment env = c.getEnvironment();
462      Region region = env.getRegion();
463      try {
464        putData(region);
465        RegionScanner scanner = region.getScanner(new Scan());
466        List<Cell> result = new LinkedList<>();
467        while (scanner.next(result)) {
468          result.clear();
469        }
470      } catch (Exception e) {
471        LOG.warn("Got exception in coprocessor", e);
472      }
473    }
474
475    private void putData(Region region) throws Exception {
476      Put put = new Put(ROW1);
477      put.addColumn(CF1, COL1, VAL1);
478      region.put(put);
479      put = new Put(ROW2);
480      put.addColumn(CF1, COL1, VAL1);
481      region.put(put);
482      put = new Put(ROW3);
483      put.addColumn(CF1, COL1, VAL1);
484      region.put(put);
485    }
486  }
487
488  private enum Metric {
489    REGION_READ,
490    SERVER_READ,
491    FILTERED_REGION_READ,
492    FILTERED_SERVER_READ
493  }
494}