001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.EnumSet;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.ClusterMetrics.Option;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.RegionLoad;
036import org.apache.hadoop.hbase.ServerLoad;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.Append;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Increment;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.ResultScanner;
049import org.apache.hadoop.hbase.client.RowMutations;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.Table;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.coprocessor.ObserverContext;
054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
056import org.apache.hadoop.hbase.coprocessor.RegionObserver;
057import org.apache.hadoop.hbase.filter.BinaryComparator;
058import org.apache.hadoop.hbase.filter.CompareFilter;
059import org.apache.hadoop.hbase.filter.RowFilter;
060import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
061import org.apache.hadoop.hbase.master.LoadBalancer;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.junit.AfterClass;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Ignore;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073@Ignore // Depends on Master being able to host regions. Needs fixing.
074@Category(MediumTests.class)
075public class TestRegionServerReadRequestMetrics {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079    HBaseClassTestRule.forClass(TestRegionServerReadRequestMetrics.class);
080
081  private static final Logger LOG =
082    LoggerFactory.getLogger(TestRegionServerReadRequestMetrics.class);
083  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
084  private static final TableName TABLE_NAME = TableName.valueOf("test");
085  private static final byte[] CF1 = "c1".getBytes();
086  private static final byte[] CF2 = "c2".getBytes();
087
088  private static final byte[] ROW1 = "a".getBytes();
089  private static final byte[] ROW2 = "b".getBytes();
090  private static final byte[] ROW3 = "c".getBytes();
091  private static final byte[] COL1 = "q1".getBytes();
092  private static final byte[] COL2 = "q2".getBytes();
093  private static final byte[] COL3 = "q3".getBytes();
094  private static final byte[] VAL1 = "v1".getBytes();
095  private static final byte[] VAL2 = "v2".getBytes();
096  private static final byte[] VAL3 = Bytes.toBytes(0L);
097
098  private static final int MAX_TRY = 20;
099  private static final int SLEEP_MS = 100;
100  private static final int TTL = 1;
101
102  private static Admin admin;
103  private static Collection<ServerName> serverNames;
104  private static Table table;
105  private static RegionInfo regionInfo;
106
107  private static Map<Metric, Long> requestsMap = new HashMap<>();
108  private static Map<Metric, Long> requestsMapPrev = new HashMap<>();
109
110  @BeforeClass
111  public static void setUpOnce() throws Exception {
112    // Default starts one regionserver only.
113    TEST_UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
114    // TEST_UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
115    TEST_UTIL.startMiniCluster();
116    admin = TEST_UTIL.getAdmin();
117    serverNames =
118      admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().keySet();
119    table = createTable();
120    putData();
121    List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
122    assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regions.size());
123    regionInfo = regions.get(0);
124
125    for (Metric metric : Metric.values()) {
126      requestsMap.put(metric, 0L);
127      requestsMapPrev.put(metric, 0L);
128    }
129  }
130
131  private static Table createTable() throws IOException {
132    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
133    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
134    builder
135      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL).build());
136    admin.createTable(builder.build());
137    return TEST_UTIL.getConnection().getTable(TABLE_NAME);
138  }
139
140  private static void testReadRequests(long resultCount, long expectedReadRequests,
141    long expectedFilteredReadRequests) throws IOException, InterruptedException {
142    updateMetricsMap();
143    System.out.println("requestsMapPrev = " + requestsMapPrev);
144    System.out.println("requestsMap = " + requestsMap);
145
146    assertEquals(expectedReadRequests,
147      requestsMap.get(Metric.REGION_READ) - requestsMapPrev.get(Metric.REGION_READ));
148    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(TEST_UTIL.getConfiguration());
149    if (tablesOnMaster) {
150      // If NO tables on master, then the single regionserver in this test carries user-space
151      // tables and the meta table. The first time through, the read will be inflated by meta
152      // lookups. We don't know which test will be first through since junit randomizes. This
153      // method is used by a bunch of tests. Just do this check if master is hosting (system)
154      // regions only.
155      assertEquals(expectedReadRequests,
156        requestsMap.get(Metric.SERVER_READ) - requestsMapPrev.get(Metric.SERVER_READ));
157    }
158    assertEquals(expectedFilteredReadRequests, requestsMap.get(Metric.FILTERED_REGION_READ)
159      - requestsMapPrev.get(Metric.FILTERED_REGION_READ));
160    assertEquals(expectedFilteredReadRequests, requestsMap.get(Metric.FILTERED_SERVER_READ)
161      - requestsMapPrev.get(Metric.FILTERED_SERVER_READ));
162    assertEquals(expectedReadRequests, resultCount);
163  }
164
165  private static void updateMetricsMap() throws IOException, InterruptedException {
166    for (Metric metric : Metric.values()) {
167      requestsMapPrev.put(metric, requestsMap.get(metric));
168    }
169
170    ServerLoad serverLoad = null;
171    RegionLoad regionLoadOuter = null;
172    boolean metricsUpdated = false;
173    for (int i = 0; i < MAX_TRY; i++) {
174      for (ServerName serverName : serverNames) {
175        serverLoad = new ServerLoad(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
176          .getLiveServerMetrics().get(serverName));
177
178        Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
179        RegionLoad regionLoad = regionsLoad.get(regionInfo.getRegionName());
180        if (regionLoad != null) {
181          regionLoadOuter = regionLoad;
182          for (Metric metric : Metric.values()) {
183            if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) {
184              for (Metric metricInner : Metric.values()) {
185                requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner));
186              }
187              metricsUpdated = true;
188              break;
189            }
190          }
191        }
192      }
193      if (metricsUpdated) {
194        break;
195      }
196      Thread.sleep(SLEEP_MS);
197    }
198    if (!metricsUpdated) {
199      for (Metric metric : Metric.values()) {
200        requestsMap.put(metric, getReadRequest(serverLoad, regionLoadOuter, metric));
201      }
202    }
203  }
204
205  private static long getReadRequest(ServerLoad serverLoad, RegionLoad regionLoad, Metric metric) {
206    switch (metric) {
207      case REGION_READ:
208        return regionLoad.getReadRequestsCount();
209      case SERVER_READ:
210        return serverLoad.getReadRequestsCount();
211      case FILTERED_REGION_READ:
212        return regionLoad.getFilteredReadRequestsCount();
213      case FILTERED_SERVER_READ:
214        return serverLoad.getFilteredReadRequestsCount();
215      default:
216        throw new IllegalStateException();
217    }
218  }
219
220  private static void putData() throws IOException {
221    Put put;
222
223    put = new Put(ROW1);
224    put.addColumn(CF1, COL1, VAL1);
225    put.addColumn(CF1, COL2, VAL2);
226    put.addColumn(CF1, COL3, VAL3);
227    table.put(put);
228    put = new Put(ROW2);
229    put.addColumn(CF1, COL1, VAL2); // put val2 instead of val1
230    put.addColumn(CF1, COL2, VAL2);
231    table.put(put);
232    put = new Put(ROW3);
233    put.addColumn(CF1, COL1, VAL1);
234    put.addColumn(CF1, COL2, VAL2);
235    table.put(put);
236  }
237
238  private static void putTTLExpiredData() throws IOException, InterruptedException {
239    Put put;
240
241    put = new Put(ROW1);
242    put.addColumn(CF2, COL1, VAL1);
243    put.addColumn(CF2, COL2, VAL2);
244    table.put(put);
245
246    Thread.sleep(TTL * 1000);
247
248    put = new Put(ROW2);
249    put.addColumn(CF2, COL1, VAL1);
250    put.addColumn(CF2, COL2, VAL2);
251    table.put(put);
252
253    put = new Put(ROW3);
254    put.addColumn(CF2, COL1, VAL1);
255    put.addColumn(CF2, COL2, VAL2);
256    table.put(put);
257  }
258
259  @AfterClass
260  public static void tearDownOnce() throws Exception {
261    TEST_UTIL.shutdownMiniCluster();
262  }
263
264  @Test
265  public void testReadRequestsCountNotFiltered() throws Exception {
266    int resultCount;
267    Scan scan;
268    Append append;
269    Put put;
270    Increment increment;
271    Get get;
272
273    // test for scan
274    scan = new Scan();
275    try (ResultScanner scanner = table.getScanner(scan)) {
276      resultCount = 0;
277      for (Result ignore : scanner) {
278        resultCount++;
279      }
280      testReadRequests(resultCount, 3, 0);
281    }
282
283    // test for scan
284    scan = new Scan(ROW2, ROW3);
285    try (ResultScanner scanner = table.getScanner(scan)) {
286      resultCount = 0;
287      for (Result ignore : scanner) {
288        resultCount++;
289      }
290      testReadRequests(resultCount, 1, 0);
291    }
292
293    // test for get
294    get = new Get(ROW2);
295    Result result = table.get(get);
296    resultCount = result.isEmpty() ? 0 : 1;
297    testReadRequests(resultCount, 1, 0);
298
299    // test for increment
300    increment = new Increment(ROW1);
301    increment.addColumn(CF1, COL3, 1);
302    result = table.increment(increment);
303    resultCount = result.isEmpty() ? 0 : 1;
304    testReadRequests(resultCount, 1, 0);
305
306    // test for checkAndPut
307    put = new Put(ROW1);
308    put.addColumn(CF1, COL2, VAL2);
309    boolean checkAndPut =
310      table.checkAndMutate(ROW1, CF1).qualifier(COL2).ifEquals(VAL2).thenPut(put);
311    resultCount = checkAndPut ? 1 : 0;
312    testReadRequests(resultCount, 1, 0);
313
314    // test for append
315    append = new Append(ROW1);
316    append.addColumn(CF1, COL2, VAL2);
317    result = table.append(append);
318    resultCount = result.isEmpty() ? 0 : 1;
319    testReadRequests(resultCount, 1, 0);
320
321    // test for checkAndMutate
322    put = new Put(ROW1);
323    put.addColumn(CF1, COL1, VAL1);
324    RowMutations rm = new RowMutations(ROW1);
325    rm.add(put);
326    boolean checkAndMutate =
327      table.checkAndMutate(ROW1, CF1).qualifier(COL1).ifEquals(VAL1).thenMutate(rm);
328    resultCount = checkAndMutate ? 1 : 0;
329    testReadRequests(resultCount, 1, 0);
330  }
331
332  @Ignore // HBASE-19785
333  @Test
334  public void testReadRequestsCountWithFilter() throws Exception {
335    int resultCount;
336    Scan scan;
337
338    // test for scan
339    scan = new Scan();
340    scan.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1));
341    try (ResultScanner scanner = table.getScanner(scan)) {
342      resultCount = 0;
343      for (Result ignore : scanner) {
344        resultCount++;
345      }
346      testReadRequests(resultCount, 2, 1);
347    }
348
349    // test for scan
350    scan = new Scan();
351    scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(ROW1)));
352    try (ResultScanner scanner = table.getScanner(scan)) {
353      resultCount = 0;
354      for (Result ignore : scanner) {
355        resultCount++;
356      }
357      testReadRequests(resultCount, 1, 2);
358    }
359
360    // test for scan
361    scan = new Scan(ROW2, ROW3);
362    scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(ROW1)));
363    try (ResultScanner scanner = table.getScanner(scan)) {
364      resultCount = 0;
365      for (Result ignore : scanner) {
366        resultCount++;
367      }
368      testReadRequests(resultCount, 0, 1);
369    }
370
371    // fixme filtered get should not increase readRequestsCount
372    // Get get = new Get(ROW2);
373    // get.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1));
374    // Result result = table.get(get);
375    // resultCount = result.isEmpty() ? 0 : 1;
376    // testReadRequests(resultCount, 0, 1);
377  }
378
379  @Ignore // HBASE-19785
380  @Test
381  public void testReadRequestsCountWithDeletedRow() throws Exception {
382    try {
383      Delete delete = new Delete(ROW3);
384      table.delete(delete);
385
386      Scan scan = new Scan();
387      try (ResultScanner scanner = table.getScanner(scan)) {
388        int resultCount = 0;
389        for (Result ignore : scanner) {
390          resultCount++;
391        }
392        testReadRequests(resultCount, 2, 1);
393      }
394    } finally {
395      Put put = new Put(ROW3);
396      put.addColumn(CF1, COL1, VAL1);
397      put.addColumn(CF1, COL2, VAL2);
398      table.put(put);
399    }
400  }
401
402  @Test
403  public void testReadRequestsCountWithTTLExpiration() throws Exception {
404    putTTLExpiredData();
405
406    Scan scan = new Scan();
407    scan.addFamily(CF2);
408    try (ResultScanner scanner = table.getScanner(scan)) {
409      int resultCount = 0;
410      for (Result ignore : scanner) {
411        resultCount++;
412      }
413      testReadRequests(resultCount, 2, 1);
414    }
415  }
416
417  @Ignore // See HBASE-19785
418  @Test
419  public void testReadRequestsWithCoprocessor() throws Exception {
420    TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor");
421    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
422    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
423    builder.setCoprocessor(ScanRegionCoprocessor.class.getName());
424    admin.createTable(builder.build());
425
426    try {
427      TEST_UTIL.waitTableAvailable(tableName);
428      List<RegionInfo> regionInfos = admin.getRegions(tableName);
429      assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regionInfos.size());
430      boolean success = true;
431      int i = 0;
432      for (; i < MAX_TRY; i++) {
433        try {
434          testReadRequests(regionInfos.get(0).getRegionName(), 3);
435        } catch (Throwable t) {
436          LOG.warn("Got exception when try " + i + " times", t);
437          Thread.sleep(SLEEP_MS);
438          success = false;
439        }
440        if (success) {
441          break;
442        }
443      }
444      if (i == MAX_TRY) {
445        fail("Failed to get right read requests metric after try " + i + " times");
446      }
447    } finally {
448      admin.disableTable(tableName);
449      admin.deleteTable(tableName);
450    }
451  }
452
453  private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception {
454    for (ServerName serverName : serverNames) {
455      ServerLoad serverLoad = new ServerLoad(admin
456        .getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().get(serverName));
457      Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
458      RegionLoad regionLoad = regionsLoad.get(regionName);
459      if (regionLoad != null) {
460        LOG.debug("server read request is " + serverLoad.getReadRequestsCount()
461          + ", region read request is " + regionLoad.getReadRequestsCount());
462        assertEquals(3, serverLoad.getReadRequestsCount());
463        assertEquals(3, regionLoad.getReadRequestsCount());
464      }
465    }
466  }
467
468  public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver {
469    @Override
470    public Optional<RegionObserver> getRegionObserver() {
471      return Optional.of(this);
472    }
473
474    @Override
475    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
476      RegionCoprocessorEnvironment env = c.getEnvironment();
477      Region region = env.getRegion();
478      try {
479        putData(region);
480        RegionScanner scanner = region.getScanner(new Scan());
481        List<Cell> result = new LinkedList<>();
482        while (scanner.next(result)) {
483          result.clear();
484        }
485      } catch (Exception e) {
486        LOG.warn("Got exception in coprocessor", e);
487      }
488    }
489
490    private void putData(Region region) throws Exception {
491      Put put = new Put(ROW1);
492      put.addColumn(CF1, COL1, VAL1);
493      region.put(put);
494      put = new Put(ROW2);
495      put.addColumn(CF1, COL1, VAL1);
496      region.put(put);
497      put = new Put(ROW3);
498      put.addColumn(CF1, COL1, VAL1);
499      region.put(put);
500    }
501  }
502
503  private enum Metric {
504    REGION_READ,
505    SERVER_READ,
506    FILTERED_REGION_READ,
507    FILTERED_SERVER_READ
508  }
509}