001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.fail;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.EnumSet;
026import java.util.HashMap;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.ClusterMetrics.Option;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseTestingUtility;
035import org.apache.hadoop.hbase.RegionLoad;
036import org.apache.hadoop.hbase.ServerLoad;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.Append;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Increment;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.ResultScanner;
049import org.apache.hadoop.hbase.client.RowMutations;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.Table;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.coprocessor.ObserverContext;
054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
056import org.apache.hadoop.hbase.coprocessor.RegionObserver;
057import org.apache.hadoop.hbase.filter.BinaryComparator;
058import org.apache.hadoop.hbase.filter.CompareFilter;
059import org.apache.hadoop.hbase.filter.RowFilter;
060import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
061import org.apache.hadoop.hbase.master.LoadBalancer;
062import org.apache.hadoop.hbase.testclassification.MediumTests;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.junit.AfterClass;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Ignore;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073@Ignore // Depends on Master being able to host regions. Needs fixing.
074@Category(MediumTests.class)
075public class TestRegionServerReadRequestMetrics {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079      HBaseClassTestRule.forClass(TestRegionServerReadRequestMetrics.class);
080
081  private static final Logger LOG =
082      LoggerFactory.getLogger(TestRegionServerReadRequestMetrics.class);
083  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
084  private static final TableName TABLE_NAME = TableName.valueOf("test");
085  private static final byte[] CF1 = "c1".getBytes();
086  private static final byte[] CF2 = "c2".getBytes();
087
088  private static final byte[] ROW1 = "a".getBytes();
089  private static final byte[] ROW2 = "b".getBytes();
090  private static final byte[] ROW3 = "c".getBytes();
091  private static final byte[] COL1 = "q1".getBytes();
092  private static final byte[] COL2 = "q2".getBytes();
093  private static final byte[] COL3 = "q3".getBytes();
094  private static final byte[] VAL1 = "v1".getBytes();
095  private static final byte[] VAL2 = "v2".getBytes();
096  private static final byte[] VAL3 = Bytes.toBytes(0L);
097
098  private static final int MAX_TRY = 20;
099  private static final int SLEEP_MS = 100;
100  private static final int TTL = 1;
101
102  private static Admin admin;
103  private static Collection<ServerName> serverNames;
104  private static Table table;
105  private static RegionInfo regionInfo;
106
107  private static Map<Metric, Long> requestsMap = new HashMap<>();
108  private static Map<Metric, Long> requestsMapPrev = new HashMap<>();
109
110  @BeforeClass
111  public static void setUpOnce() throws Exception {
112    // Default starts one regionserver only.
113    TEST_UTIL.getConfiguration().setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
114    // TEST_UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
115    TEST_UTIL.startMiniCluster();
116    admin = TEST_UTIL.getAdmin();
117    serverNames = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
118      .getLiveServerMetrics().keySet();
119    table = createTable();
120    putData();
121    List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
122    assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regions.size());
123    regionInfo = regions.get(0);
124
125    for (Metric metric : Metric.values()) {
126      requestsMap.put(metric, 0L);
127      requestsMapPrev.put(metric, 0L);
128    }
129  }
130
131  private static Table createTable() throws IOException {
132    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
133    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
134    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL)
135        .build());
136    admin.createTable(builder.build());
137    return TEST_UTIL.getConnection().getTable(TABLE_NAME);
138  }
139
140  private static void testReadRequests(long resultCount,
141    long expectedReadRequests, long expectedFilteredReadRequests)
142    throws IOException, InterruptedException {
143    updateMetricsMap();
144    System.out.println("requestsMapPrev = " + requestsMapPrev);
145    System.out.println("requestsMap = " + requestsMap);
146
147    assertEquals(expectedReadRequests,
148      requestsMap.get(Metric.REGION_READ) - requestsMapPrev.get(Metric.REGION_READ));
149    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(TEST_UTIL.getConfiguration());
150    if (tablesOnMaster) {
151      // If NO tables on master, then the single regionserver in this test carries user-space
152      // tables and the meta table. The first time through, the read will be inflated by meta
153      // lookups. We don't know which test will be first through since junit randomizes. This
154      // method is used by a bunch of tests. Just do this check if master is hosting (system)
155      // regions only.
156      assertEquals(expectedReadRequests,
157      requestsMap.get(Metric.SERVER_READ) - requestsMapPrev.get(Metric.SERVER_READ));
158    }
159    assertEquals(expectedFilteredReadRequests,
160      requestsMap.get(Metric.FILTERED_REGION_READ)
161        - requestsMapPrev.get(Metric.FILTERED_REGION_READ));
162    assertEquals(expectedFilteredReadRequests,
163      requestsMap.get(Metric.FILTERED_SERVER_READ)
164        - requestsMapPrev.get(Metric.FILTERED_SERVER_READ));
165    assertEquals(expectedReadRequests, resultCount);
166  }
167
168  private static void updateMetricsMap() throws IOException, InterruptedException {
169    for (Metric metric : Metric.values()) {
170      requestsMapPrev.put(metric, requestsMap.get(metric));
171    }
172
173    ServerLoad serverLoad = null;
174    RegionLoad regionLoadOuter = null;
175    boolean metricsUpdated = false;
176    for (int i = 0; i < MAX_TRY; i++) {
177      for (ServerName serverName : serverNames) {
178        serverLoad = new ServerLoad(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
179          .getLiveServerMetrics().get(serverName));
180
181        Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
182        RegionLoad regionLoad = regionsLoad.get(regionInfo.getRegionName());
183        if (regionLoad != null) {
184          regionLoadOuter = regionLoad;
185          for (Metric metric : Metric.values()) {
186            if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) {
187              for (Metric metricInner : Metric.values()) {
188                requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner));
189              }
190              metricsUpdated = true;
191              break;
192            }
193          }
194        }
195      }
196      if (metricsUpdated) {
197        break;
198      }
199      Thread.sleep(SLEEP_MS);
200    }
201    if (!metricsUpdated) {
202      for (Metric metric : Metric.values()) {
203        requestsMap.put(metric, getReadRequest(serverLoad, regionLoadOuter, metric));
204      }
205    }
206  }
207
208  private static long getReadRequest(ServerLoad serverLoad, RegionLoad regionLoad, Metric metric) {
209    switch (metric) {
210      case REGION_READ:
211        return regionLoad.getReadRequestsCount();
212      case SERVER_READ:
213        return serverLoad.getReadRequestsCount();
214      case FILTERED_REGION_READ:
215        return regionLoad.getFilteredReadRequestsCount();
216      case FILTERED_SERVER_READ:
217        return serverLoad.getFilteredReadRequestsCount();
218      default:
219        throw new IllegalStateException();
220    }
221  }
222
223  private static void putData() throws IOException {
224    Put put;
225
226    put = new Put(ROW1);
227    put.addColumn(CF1, COL1, VAL1);
228    put.addColumn(CF1, COL2, VAL2);
229    put.addColumn(CF1, COL3, VAL3);
230    table.put(put);
231    put = new Put(ROW2);
232    put.addColumn(CF1, COL1, VAL2);  // put val2 instead of val1
233    put.addColumn(CF1, COL2, VAL2);
234    table.put(put);
235    put = new Put(ROW3);
236    put.addColumn(CF1, COL1, VAL1);
237    put.addColumn(CF1, COL2, VAL2);
238    table.put(put);
239  }
240
241  private static void putTTLExpiredData() throws IOException, InterruptedException {
242    Put put;
243
244    put = new Put(ROW1);
245    put.addColumn(CF2, COL1, VAL1);
246    put.addColumn(CF2, COL2, VAL2);
247    table.put(put);
248
249    Thread.sleep(TTL * 1000);
250
251    put = new Put(ROW2);
252    put.addColumn(CF2, COL1, VAL1);
253    put.addColumn(CF2, COL2, VAL2);
254    table.put(put);
255
256    put = new Put(ROW3);
257    put.addColumn(CF2, COL1, VAL1);
258    put.addColumn(CF2, COL2, VAL2);
259    table.put(put);
260  }
261
262  @AfterClass
263  public static void tearDownOnce() throws Exception {
264    TEST_UTIL.shutdownMiniCluster();
265  }
266
267  @Test
268  public void testReadRequestsCountNotFiltered() throws Exception {
269    int resultCount;
270    Scan scan;
271    Append append;
272    Put put;
273    Increment increment;
274    Get get;
275
276    // test for scan
277    scan = new Scan();
278    try (ResultScanner scanner = table.getScanner(scan)) {
279      resultCount = 0;
280      for (Result ignore : scanner) {
281        resultCount++;
282      }
283      testReadRequests(resultCount, 3, 0);
284    }
285
286    // test for scan
287    scan = new Scan(ROW2, ROW3);
288    try (ResultScanner scanner = table.getScanner(scan)) {
289      resultCount = 0;
290      for (Result ignore : scanner) {
291        resultCount++;
292      }
293      testReadRequests(resultCount, 1, 0);
294    }
295
296    // test for get
297    get = new Get(ROW2);
298    Result result = table.get(get);
299    resultCount = result.isEmpty() ? 0 : 1;
300    testReadRequests(resultCount, 1, 0);
301
302    // test for increment
303    increment = new Increment(ROW1);
304    increment.addColumn(CF1, COL3, 1);
305    result = table.increment(increment);
306    resultCount = result.isEmpty() ? 0 : 1;
307    testReadRequests(resultCount, 1, 0);
308
309    // test for checkAndPut
310    put = new Put(ROW1);
311    put.addColumn(CF1, COL2, VAL2);
312    boolean checkAndPut =
313        table.checkAndMutate(ROW1, CF1).qualifier(COL2).ifEquals(VAL2).thenPut(put);
314    resultCount = checkAndPut ? 1 : 0;
315    testReadRequests(resultCount, 1, 0);
316
317    // test for append
318    append = new Append(ROW1);
319    append.addColumn(CF1, COL2, VAL2);
320    result = table.append(append);
321    resultCount = result.isEmpty() ? 0 : 1;
322    testReadRequests(resultCount, 1, 0);
323
324    // test for checkAndMutate
325    put = new Put(ROW1);
326    put.addColumn(CF1, COL1, VAL1);
327    RowMutations rm = new RowMutations(ROW1);
328    rm.add(put);
329    boolean checkAndMutate =
330        table.checkAndMutate(ROW1, CF1).qualifier(COL1).ifEquals(VAL1).thenMutate(rm);
331    resultCount = checkAndMutate ? 1 : 0;
332    testReadRequests(resultCount, 1, 0);
333  }
334
335  @Ignore // HBASE-19785
336  @Test
337  public void testReadRequestsCountWithFilter() throws Exception {
338    int resultCount;
339    Scan scan;
340
341    // test for scan
342    scan = new Scan();
343    scan.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1));
344    try (ResultScanner scanner = table.getScanner(scan)) {
345      resultCount = 0;
346      for (Result ignore : scanner) {
347        resultCount++;
348      }
349      testReadRequests(resultCount, 2, 1);
350    }
351
352    // test for scan
353    scan = new Scan();
354    scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(ROW1)));
355    try (ResultScanner scanner = table.getScanner(scan)) {
356      resultCount = 0;
357      for (Result ignore : scanner) {
358        resultCount++;
359      }
360      testReadRequests(resultCount, 1, 2);
361    }
362
363    // test for scan
364    scan = new Scan(ROW2, ROW3);
365    scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(ROW1)));
366    try (ResultScanner scanner = table.getScanner(scan)) {
367      resultCount = 0;
368      for (Result ignore : scanner) {
369        resultCount++;
370      }
371      testReadRequests(resultCount, 0, 1);
372    }
373
374    // fixme filtered get should not increase readRequestsCount
375//    Get get = new Get(ROW2);
376//    get.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1));
377//    Result result = table.get(get);
378//    resultCount = result.isEmpty() ? 0 : 1;
379//    testReadRequests(resultCount, 0, 1);
380  }
381
382  @Ignore // HBASE-19785
383  @Test
384  public void testReadRequestsCountWithDeletedRow() throws Exception {
385    try {
386      Delete delete = new Delete(ROW3);
387      table.delete(delete);
388
389      Scan scan = new Scan();
390      try (ResultScanner scanner = table.getScanner(scan)) {
391        int resultCount = 0;
392        for (Result ignore : scanner) {
393          resultCount++;
394        }
395        testReadRequests(resultCount, 2, 1);
396      }
397    } finally {
398      Put put = new Put(ROW3);
399      put.addColumn(CF1, COL1, VAL1);
400      put.addColumn(CF1, COL2, VAL2);
401      table.put(put);
402    }
403  }
404
405  @Test
406  public void testReadRequestsCountWithTTLExpiration() throws Exception {
407    putTTLExpiredData();
408
409    Scan scan = new Scan();
410    scan.addFamily(CF2);
411    try (ResultScanner scanner = table.getScanner(scan)) {
412      int resultCount = 0;
413      for (Result ignore : scanner) {
414        resultCount++;
415      }
416      testReadRequests(resultCount, 2, 1);
417    }
418  }
419
420  @Ignore // See HBASE-19785
421  @Test
422  public void testReadRequestsWithCoprocessor() throws Exception {
423    TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor");
424    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
425    builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
426    builder.setCoprocessor(ScanRegionCoprocessor.class.getName());
427    admin.createTable(builder.build());
428
429    try {
430      TEST_UTIL.waitTableAvailable(tableName);
431      List<RegionInfo> regionInfos = admin.getRegions(tableName);
432      assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regionInfos.size());
433      boolean success = true;
434      int i = 0;
435      for (; i < MAX_TRY; i++) {
436        try {
437          testReadRequests(regionInfos.get(0).getRegionName(), 3);
438        } catch (Throwable t) {
439          LOG.warn("Got exception when try " + i + " times", t);
440          Thread.sleep(SLEEP_MS);
441          success = false;
442        }
443        if (success) {
444          break;
445        }
446      }
447      if (i == MAX_TRY) {
448        fail("Failed to get right read requests metric after try " + i + " times");
449      }
450    } finally {
451      admin.disableTable(tableName);
452      admin.deleteTable(tableName);
453    }
454  }
455
456  private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception {
457    for (ServerName serverName : serverNames) {
458      ServerLoad serverLoad = new ServerLoad(admin.getClusterMetrics(
459        EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().get(serverName));
460      Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
461      RegionLoad regionLoad = regionsLoad.get(regionName);
462      if (regionLoad != null) {
463        LOG.debug("server read request is " + serverLoad.getReadRequestsCount()
464            + ", region read request is " + regionLoad.getReadRequestsCount());
465        assertEquals(3, serverLoad.getReadRequestsCount());
466        assertEquals(3, regionLoad.getReadRequestsCount());
467      }
468    }
469  }
470
471  public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver {
472    @Override
473    public Optional<RegionObserver> getRegionObserver() {
474      return Optional.of(this);
475    }
476
477    @Override
478    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
479      RegionCoprocessorEnvironment env = c.getEnvironment();
480      Region region = env.getRegion();
481      try {
482        putData(region);
483        RegionScanner scanner = region.getScanner(new Scan());
484        List<Cell> result = new LinkedList<>();
485        while (scanner.next(result)) {
486          result.clear();
487        }
488      } catch (Exception e) {
489        LOG.warn("Got exception in coprocessor", e);
490      }
491    }
492
493    private void putData(Region region) throws Exception {
494      Put put = new Put(ROW1);
495      put.addColumn(CF1, COL1, VAL1);
496      region.put(put);
497      put = new Put(ROW2);
498      put.addColumn(CF1, COL1, VAL1);
499      region.put(put);
500      put = new Put(ROW3);
501      put.addColumn(CF1, COL1, VAL1);
502      region.put(put);
503    }
504  }
505
506  private enum Metric {REGION_READ, SERVER_READ, FILTERED_REGION_READ, FILTERED_SERVER_READ}
507}