001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.EnumSet;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.ClusterMetrics.Option;
033import org.apache.hadoop.hbase.CompareOperator;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.RegionMetrics;
037import org.apache.hadoop.hbase.ServerMetrics;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.Append;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.Increment;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.ResultScanner;
050import org.apache.hadoop.hbase.client.RowMutations;
051import org.apache.hadoop.hbase.client.Scan;
052import org.apache.hadoop.hbase.client.Table;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.coprocessor.ObserverContext;
055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
056import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
057import org.apache.hadoop.hbase.coprocessor.RegionObserver;
058import org.apache.hadoop.hbase.filter.BinaryComparator;
059import org.apache.hadoop.hbase.filter.RowFilter;
060import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
061import org.apache.hadoop.hbase.testclassification.MediumTests;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.junit.AfterClass;
064import org.junit.BeforeClass;
065import org.junit.ClassRule;
066import org.junit.Ignore;
067import org.junit.Test;
068import org.junit.experimental.categories.Category;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072@Ignore // Depends on Master being able to host regions. Needs fixing.
073@Category(MediumTests.class)
074public class TestRegionServerReadRequestMetrics {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestRegionServerReadRequestMetrics.class);
079
080  private static final Logger LOG =
081    LoggerFactory.getLogger(TestRegionServerReadRequestMetrics.class);
082  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
083  private static final TableName TABLE_NAME = TableName.valueOf("test");
084  private static final byte[] CF1 = Bytes.toBytes("c1");
085  private static final byte[] CF2 = Bytes.toBytes("c2");
086
087  private static final byte[] ROW1 = Bytes.toBytes("a");
088  private static final byte[] ROW2 = Bytes.toBytes("b");
089  private static final byte[] ROW3 = Bytes.toBytes("c");
090  private static final byte[] COL1 = Bytes.toBytes("q1");
091  private static final byte[] COL2 = Bytes.toBytes("q2");
092  private static final byte[] COL3 = Bytes.toBytes("q3");
093  private static final byte[] VAL1 = Bytes.toBytes("v1");
094  private static final byte[] VAL2 = Bytes.toBytes("v2");
095  private static final byte[] VAL3 = Bytes.toBytes(0L);
096
097  private static final int MAX_TRY = 20;
098  private static final int SLEEP_MS = 100;
099  private static final int TTL = 1;
100
101  private static Admin admin;
102  private static Collection<ServerName> serverNames;
103  private static Table table;
104  private static RegionInfo regionInfo;
105
106  private static Map<Metric, Long> requestsMap = new HashMap<>();
107  private static Map<Metric, Long> requestsMapPrev = new HashMap<>();
108
109  @BeforeClass
110  public static void setUpOnce() throws Exception {
111    TEST_UTIL.startMiniCluster();
112    admin = TEST_UTIL.getAdmin();
113    serverNames =
114      admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().keySet();
115    table = createTable();
116    putData();
117    List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
118    assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regions.size());
119    regionInfo = regions.get(0);
120
121    for (Metric metric : Metric.values()) {
122      requestsMap.put(metric, 0L);
123      requestsMapPrev.put(metric, 0L);
124    }
125  }
126
127  private static Table createTable() throws IOException {
128    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
129    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
130    builder
131      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL).build());
132    admin.createTable(builder.build());
133    return TEST_UTIL.getConnection().getTable(TABLE_NAME);
134  }
135
136  private static void testReadRequests(long resultCount, long expectedReadRequests,
137    long expectedFilteredReadRequests) throws IOException, InterruptedException {
138    updateMetricsMap();
139    System.out.println("requestsMapPrev = " + requestsMapPrev);
140    System.out.println("requestsMap = " + requestsMap);
141
142    assertEquals(expectedReadRequests,
143      requestsMap.get(Metric.REGION_READ) - requestsMapPrev.get(Metric.REGION_READ));
144    assertEquals(expectedFilteredReadRequests, requestsMap.get(Metric.FILTERED_REGION_READ)
145      - requestsMapPrev.get(Metric.FILTERED_REGION_READ));
146    assertEquals(expectedFilteredReadRequests, requestsMap.get(Metric.FILTERED_SERVER_READ)
147      - requestsMapPrev.get(Metric.FILTERED_SERVER_READ));
148    assertEquals(expectedReadRequests, resultCount);
149  }
150
151  private static void updateMetricsMap() throws IOException, InterruptedException {
152    for (Metric metric : Metric.values()) {
153      requestsMapPrev.put(metric, requestsMap.get(metric));
154    }
155
156    ServerMetrics serverMetrics = null;
157    RegionMetrics regionMetricsOuter = null;
158    boolean metricsUpdated = false;
159    for (int i = 0; i < MAX_TRY; i++) {
160      for (ServerName serverName : serverNames) {
161        serverMetrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
162          .getLiveServerMetrics().get(serverName);
163
164        Map<byte[], RegionMetrics> regionMetrics = serverMetrics.getRegionMetrics();
165        RegionMetrics regionMetric = regionMetrics.get(regionInfo.getRegionName());
166        if (regionMetric != null) {
167          regionMetricsOuter = regionMetric;
168          for (Metric metric : Metric.values()) {
169            if (getReadRequest(serverMetrics, regionMetric, metric) > requestsMapPrev.get(metric)) {
170              for (Metric metricInner : Metric.values()) {
171                requestsMap.put(metricInner,
172                  getReadRequest(serverMetrics, regionMetric, metricInner));
173              }
174              metricsUpdated = true;
175              break;
176            }
177          }
178        }
179      }
180      if (metricsUpdated) {
181        break;
182      }
183      Thread.sleep(SLEEP_MS);
184    }
185    if (!metricsUpdated) {
186      for (Metric metric : Metric.values()) {
187        requestsMap.put(metric, getReadRequest(serverMetrics, regionMetricsOuter, metric));
188      }
189    }
190  }
191
192  private static long getReadRequest(ServerMetrics serverMetrics, RegionMetrics regionMetrics,
193    Metric metric) {
194    switch (metric) {
195      case REGION_READ:
196        return regionMetrics.getReadRequestCount();
197      case SERVER_READ:
198        return serverMetrics.getRegionMetrics().get(regionMetrics.getRegionName())
199          .getReadRequestCount();
200      case FILTERED_REGION_READ:
201        return regionMetrics.getFilteredReadRequestCount();
202      case FILTERED_SERVER_READ:
203        return serverMetrics.getRegionMetrics().get(regionMetrics.getRegionName())
204          .getFilteredReadRequestCount();
205      default:
206        throw new IllegalStateException();
207    }
208  }
209
210  private static void putData() throws IOException {
211    Put put;
212
213    put = new Put(ROW1);
214    put.addColumn(CF1, COL1, VAL1);
215    put.addColumn(CF1, COL2, VAL2);
216    put.addColumn(CF1, COL3, VAL3);
217    table.put(put);
218    put = new Put(ROW2);
219    put.addColumn(CF1, COL1, VAL2); // put val2 instead of val1
220    put.addColumn(CF1, COL2, VAL2);
221    table.put(put);
222    put = new Put(ROW3);
223    put.addColumn(CF1, COL1, VAL1);
224    put.addColumn(CF1, COL2, VAL2);
225    table.put(put);
226  }
227
228  private static void putTTLExpiredData() throws IOException, InterruptedException {
229    Put put;
230
231    put = new Put(ROW1);
232    put.addColumn(CF2, COL1, VAL1);
233    put.addColumn(CF2, COL2, VAL2);
234    table.put(put);
235
236    Thread.sleep(TTL * 1000);
237
238    put = new Put(ROW2);
239    put.addColumn(CF2, COL1, VAL1);
240    put.addColumn(CF2, COL2, VAL2);
241    table.put(put);
242
243    put = new Put(ROW3);
244    put.addColumn(CF2, COL1, VAL1);
245    put.addColumn(CF2, COL2, VAL2);
246    table.put(put);
247  }
248
249  @AfterClass
250  public static void tearDownOnce() throws Exception {
251    TEST_UTIL.shutdownMiniCluster();
252  }
253
254  @Test
255  public void testReadRequestsCountNotFiltered() throws Exception {
256    int resultCount;
257    Scan scan;
258    Append append;
259    Put put;
260    Increment increment;
261    Get get;
262
263    // test for scan
264    scan = new Scan();
265    try (ResultScanner scanner = table.getScanner(scan)) {
266      resultCount = 0;
267      for (Result ignore : scanner) {
268        resultCount++;
269      }
270      testReadRequests(resultCount, 3, 0);
271    }
272
273    // test for scan
274    scan = new Scan().withStartRow(ROW2).withStopRow(ROW3);
275    try (ResultScanner scanner = table.getScanner(scan)) {
276      resultCount = 0;
277      for (Result ignore : scanner) {
278        resultCount++;
279      }
280      testReadRequests(resultCount, 1, 0);
281    }
282
283    // test for get
284    get = new Get(ROW2);
285    Result result = table.get(get);
286    resultCount = result.isEmpty() ? 0 : 1;
287    testReadRequests(resultCount, 1, 0);
288
289    // test for increment
290    increment = new Increment(ROW1);
291    increment.addColumn(CF1, COL3, 1);
292    result = table.increment(increment);
293    resultCount = result.isEmpty() ? 0 : 1;
294    testReadRequests(resultCount, 1, 0);
295
296    // test for checkAndPut
297    put = new Put(ROW1);
298    put.addColumn(CF1, COL2, VAL2);
299    boolean checkAndPut =
300      table.checkAndMutate(ROW1, CF1).qualifier(COL2).ifEquals(VAL2).thenPut(put);
301    resultCount = checkAndPut ? 1 : 0;
302    testReadRequests(resultCount, 1, 0);
303
304    // test for append
305    append = new Append(ROW1);
306    append.addColumn(CF1, COL2, VAL2);
307    result = table.append(append);
308    resultCount = result.isEmpty() ? 0 : 1;
309    testReadRequests(resultCount, 1, 0);
310
311    // test for checkAndMutate
312    put = new Put(ROW1);
313    put.addColumn(CF1, COL1, VAL1);
314    RowMutations rm = new RowMutations(ROW1);
315    rm.add(put);
316    boolean checkAndMutate =
317      table.checkAndMutate(ROW1, CF1).qualifier(COL1).ifEquals(VAL1).thenMutate(rm);
318    resultCount = checkAndMutate ? 1 : 0;
319    testReadRequests(resultCount, 1, 0);
320  }
321
322  @Ignore // HBASE-19785
323  @Test
324  public void testReadRequestsCountWithFilter() throws Exception {
325    int resultCount;
326    Scan scan;
327
328    // test for scan
329    scan = new Scan();
330    scan.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareOperator.EQUAL, VAL1));
331    try (ResultScanner scanner = table.getScanner(scan)) {
332      resultCount = 0;
333      for (Result ignore : scanner) {
334        resultCount++;
335      }
336      testReadRequests(resultCount, 2, 1);
337    }
338
339    // test for scan
340    scan = new Scan();
341    scan.setFilter(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW1)));
342    try (ResultScanner scanner = table.getScanner(scan)) {
343      resultCount = 0;
344      for (Result ignore : scanner) {
345        resultCount++;
346      }
347      testReadRequests(resultCount, 1, 2);
348    }
349
350    // test for scan
351    scan = new Scan().withStartRow(ROW2).withStopRow(ROW3);
352    scan.setFilter(new RowFilter(CompareOperator.EQUAL, new BinaryComparator(ROW1)));
353    try (ResultScanner scanner = table.getScanner(scan)) {
354      resultCount = 0;
355      for (Result ignore : scanner) {
356        resultCount++;
357      }
358      testReadRequests(resultCount, 0, 1);
359    }
360
361    // fixme filtered get should not increase readRequestsCount
362    // Get get = new Get(ROW2);
363    // get.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1));
364    // Result result = table.get(get);
365    // resultCount = result.isEmpty() ? 0 : 1;
366    // testReadRequests(resultCount, 0, 1);
367  }
368
369  @Ignore // HBASE-19785
370  @Test
371  public void testReadRequestsCountWithDeletedRow() throws Exception {
372    try {
373      Delete delete = new Delete(ROW3);
374      table.delete(delete);
375
376      Scan scan = new Scan();
377      try (ResultScanner scanner = table.getScanner(scan)) {
378        int resultCount = 0;
379        for (Result ignore : scanner) {
380          resultCount++;
381        }
382        testReadRequests(resultCount, 2, 1);
383      }
384    } finally {
385      Put put = new Put(ROW3);
386      put.addColumn(CF1, COL1, VAL1);
387      put.addColumn(CF1, COL2, VAL2);
388      table.put(put);
389    }
390  }
391
392  @Test
393  public void testReadRequestsCountWithTTLExpiration() throws Exception {
394    putTTLExpiredData();
395
396    Scan scan = new Scan();
397    scan.addFamily(CF2);
398    try (ResultScanner scanner = table.getScanner(scan)) {
399      int resultCount = 0;
400      for (Result ignore : scanner) {
401        resultCount++;
402      }
403      testReadRequests(resultCount, 2, 1);
404    }
405  }
406
407  @Ignore // See HBASE-19785
408  @Test
409  public void testReadRequestsWithCoprocessor() throws Exception {
410    TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor");
411    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
412    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
413    builder.setCoprocessor(ScanRegionCoprocessor.class.getName());
414    admin.createTable(builder.build());
415
416    try {
417      TEST_UTIL.waitTableAvailable(tableName);
418      List<RegionInfo> regionInfos = admin.getRegions(tableName);
419      assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regionInfos.size());
420      boolean success = true;
421      int i = 0;
422      for (; i < MAX_TRY; i++) {
423        try {
424          testReadRequests(regionInfos.get(0).getRegionName(), 3);
425        } catch (Throwable t) {
426          LOG.warn("Got exception when try " + i + " times", t);
427          Thread.sleep(SLEEP_MS);
428          success = false;
429        }
430        if (success) {
431          break;
432        }
433      }
434      if (i == MAX_TRY) {
435        fail("Failed to get right read requests metric after try " + i + " times");
436      }
437    } finally {
438      admin.disableTable(tableName);
439      admin.deleteTable(tableName);
440    }
441  }
442
443  private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception {
444    for (ServerName serverName : serverNames) {
445      ServerMetrics serverMetrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
446        .getLiveServerMetrics().get(serverName);
447      Map<byte[], RegionMetrics> regionMetrics = serverMetrics.getRegionMetrics();
448      RegionMetrics regionMetric = regionMetrics.get(regionName);
449      if (regionMetric != null) {
450        LOG.debug("server read request is "
451          + serverMetrics.getRegionMetrics().get(regionName).getReadRequestCount()
452          + ", region read request is " + regionMetric.getReadRequestCount());
453        assertEquals(3, serverMetrics.getRegionMetrics().get(regionName).getReadRequestCount());
454        assertEquals(3, regionMetric.getReadRequestCount());
455      }
456    }
457  }
458
459  public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver {
460    @Override
461    public Optional<RegionObserver> getRegionObserver() {
462      return Optional.of(this);
463    }
464
465    @Override
466    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
467      RegionCoprocessorEnvironment env = c.getEnvironment();
468      Region region = env.getRegion();
469      try {
470        putData(region);
471        RegionScanner scanner = region.getScanner(new Scan());
472        List<Cell> result = new LinkedList<>();
473        while (scanner.next(result)) {
474          result.clear();
475        }
476      } catch (Exception e) {
477        LOG.warn("Got exception in coprocessor", e);
478      }
479    }
480
481    private void putData(Region region) throws Exception {
482      Put put = new Put(ROW1);
483      put.addColumn(CF1, COL1, VAL1);
484      region.put(put);
485      put = new Put(ROW2);
486      put.addColumn(CF1, COL1, VAL1);
487      region.put(put);
488      put = new Put(ROW3);
489      put.addColumn(CF1, COL1, VAL1);
490      region.put(put);
491    }
492  }
493
494  private enum Metric {
495    REGION_READ,
496    SERVER_READ,
497    FILTERED_REGION_READ,
498    FILTERED_SERVER_READ
499  }
500}