001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.NavigableSet;
025import java.util.TreeSet;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.function.Consumer;
028import org.apache.commons.lang3.mutable.MutableInt;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.CellComparator;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtil;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.KeyValue;
036import org.apache.hadoop.hbase.PrivateConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Put;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.ResultScanner;
043import org.apache.hadoop.hbase.client.Scan;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
046import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
047import org.apache.hadoop.hbase.executor.ExecutorType;
048import org.apache.hadoop.hbase.io.hfile.BlockCache;
049import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
050import org.apache.hadoop.hbase.io.hfile.BlockType;
051import org.apache.hadoop.hbase.io.hfile.CompoundBloomFilter;
052import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
053import org.apache.hadoop.hbase.io.hfile.HFile;
054import org.apache.hadoop.hbase.io.hfile.HFileBlock;
055import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
056import org.apache.hadoop.hbase.io.hfile.HFileContext;
057import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
058import org.apache.hadoop.hbase.io.hfile.NoOpIndexBlockEncoder;
059import org.apache.hadoop.hbase.nio.ByteBuff;
060import org.apache.hadoop.hbase.testclassification.IOTests;
061import org.apache.hadoop.hbase.testclassification.LargeTests;
062import org.apache.hadoop.hbase.util.BloomFilter;
063import org.apache.hadoop.hbase.util.BloomFilterUtil;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.junit.Assert;
066import org.junit.Before;
067import org.junit.ClassRule;
068import org.junit.Rule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.junit.rules.TestName;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075@Category({ IOTests.class, LargeTests.class })
076public class TestBytesReadServerSideScanMetrics {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestBytesReadServerSideScanMetrics.class);
081
082  @Rule
083  public TestName name = new TestName();
084
085  private static final Logger LOG =
086    LoggerFactory.getLogger(TestBytesReadServerSideScanMetrics.class);
087
088  private HBaseTestingUtil UTIL;
089
090  private static final byte[] CF = Bytes.toBytes("cf");
091
092  private static final byte[] CQ = Bytes.toBytes("cq");
093
094  private static final byte[] VALUE = Bytes.toBytes("value");
095
096  private static final byte[] ROW2 = Bytes.toBytes("row2");
097  private static final byte[] ROW3 = Bytes.toBytes("row3");
098  private static final byte[] ROW4 = Bytes.toBytes("row4");
099
100  private Configuration conf;
101
102  @Before
103  public void setUp() throws Exception {
104    UTIL = new HBaseTestingUtil();
105    conf = UTIL.getConfiguration();
106    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0);
107    conf.setBoolean(CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION, false);
108  }
109
110  @Test
111  public void testScanMetricsDisabled() throws Exception {
112    conf.setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
113    UTIL.startMiniCluster();
114    try {
115      TableName tableName = TableName.valueOf(name.getMethodName());
116      createTable(tableName, false, BloomType.NONE);
117      writeData(tableName, true);
118      Scan scan = new Scan();
119      scan.withStartRow(ROW2, true);
120      scan.withStopRow(ROW4, true);
121      scan.setCaching(1);
122      try (Table table = UTIL.getConnection().getTable(tableName);
123        ResultScanner scanner = table.getScanner(scan)) {
124        int rowCount = 0;
125        for (Result r : scanner) {
126          rowCount++;
127        }
128        Assert.assertEquals(2, rowCount);
129        Assert.assertNull(scanner.getScanMetrics());
130      }
131    } finally {
132      UTIL.shutdownMiniCluster();
133    }
134  }
135
136  @Test
137  public void testBytesReadFromFsForSerialSeeks() throws Exception {
138    conf.setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
139    UTIL.startMiniCluster();
140    try {
141      TableName tableName = TableName.valueOf(name.getMethodName());
142      createTable(tableName, false, BloomType.ROW);
143      writeData(tableName, true);
144      ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true);
145
146      // Use oldest timestamp to make sure the fake key is not less than the first key in
147      // the file containing key: row2
148      KeyValue keyValue = new KeyValue(ROW2, CF, CQ, PrivateConstants.OLDEST_TIMESTAMP, VALUE);
149      assertBytesReadFromFs(tableName, scanMetrics.bytesReadFromFs.get(), keyValue,
150        scanMetrics.blockReadOpsCount.get());
151      Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
152      Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
153    } finally {
154      UTIL.shutdownMiniCluster();
155    }
156  }
157
158  @Test
159  public void testBytesReadFromFsForParallelSeeks() throws Exception {
160    conf.setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
161    // This property doesn't work correctly if only applied at column family level.
162    conf.setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, true);
163    UTIL.startMiniCluster();
164    try {
165      TableName tableName = TableName.valueOf(name.getMethodName());
166      createTable(tableName, false, BloomType.NONE);
167      writeData(tableName, true);
168      HRegionServer server = UTIL.getMiniHBaseCluster().getRegionServer(0);
169      ThreadPoolExecutor executor =
170        server.getExecutorService().getExecutorThreadPool(ExecutorType.RS_PARALLEL_SEEK);
171      long tasksCompletedBeforeRead = executor.getCompletedTaskCount();
172      ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true);
173      long tasksCompletedAfterRead = executor.getCompletedTaskCount();
174      // Assert both of the HFiles were read using parallel seek executor
175      Assert.assertEquals(2, tasksCompletedAfterRead - tasksCompletedBeforeRead);
176
177      // Use oldest timestamp to make sure the fake key is not less than the first key in
178      // the file containing key: row2
179      KeyValue keyValue = new KeyValue(ROW2, CF, CQ, PrivateConstants.OLDEST_TIMESTAMP, VALUE);
180      assertBytesReadFromFs(tableName, scanMetrics.bytesReadFromFs.get(), keyValue,
181        scanMetrics.blockReadOpsCount.get());
182      Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
183      Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
184    } finally {
185      UTIL.shutdownMiniCluster();
186    }
187  }
188
189  @Test
190  public void testBytesReadFromBlockCache() throws Exception {
191    UTIL.startMiniCluster();
192    try {
193      TableName tableName = TableName.valueOf(name.getMethodName());
194      createTable(tableName, true, BloomType.NONE);
195      HRegionServer server = UTIL.getMiniHBaseCluster().getRegionServer(0);
196      LruBlockCache blockCache = (LruBlockCache) server.getBlockCache().get();
197
198      // Assert that acceptable size of LRU block cache is greater than 1MB
199      Assert.assertTrue(blockCache.acceptableSize() > 1024 * 1024);
200      writeData(tableName, true);
201      readDataAndGetScanMetrics(tableName, false);
202      KeyValue keyValue = new KeyValue(ROW2, CF, CQ, PrivateConstants.OLDEST_TIMESTAMP, VALUE);
203      assertBlockCacheWarmUp(tableName, keyValue);
204      ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true);
205      Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
206      assertBytesReadFromBlockCache(tableName, scanMetrics.bytesReadFromBlockCache.get(), keyValue);
207      Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
208    } finally {
209      UTIL.shutdownMiniCluster();
210    }
211  }
212
213  @Test
214  public void testBytesReadFromMemstore() throws Exception {
215    UTIL.startMiniCluster();
216    try {
217      TableName tableName = TableName.valueOf(name.getMethodName());
218      createTable(tableName, false, BloomType.NONE);
219      writeData(tableName, false);
220      ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true);
221
222      // Assert no flush has happened for the table
223      List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
224      for (HRegion region : regions) {
225        HStore store = region.getStore(CF);
226        // Assert no HFile is there
227        Assert.assertEquals(0, store.getStorefiles().size());
228      }
229
230      KeyValue keyValue = new KeyValue(ROW2, CF, CQ, HConstants.LATEST_TIMESTAMP, VALUE);
231      int singleKeyValueSize = Segment.getCellLength(keyValue);
232      // First key value will be read on doing seek and second one on doing next() to determine
233      // there are no more cells in the row. We don't count key values read on SegmentScanner
234      // instance creation.
235      int totalKeyValueSize = 2 * singleKeyValueSize;
236      Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
237      Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
238      Assert.assertEquals(totalKeyValueSize, scanMetrics.bytesReadFromMemstore.get());
239    } finally {
240      UTIL.shutdownMiniCluster();
241    }
242  }
243
244  @Test
245  public void testBytesReadWithSwitchFromPReadToStream() throws Exception {
246    // Set pread max bytes to 3 to make sure that the first row is read using pread and the second
247    // one using stream read
248    Map<String, String> configuration = new HashMap<>();
249    configuration.put(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, "3");
250    UTIL.startMiniCluster();
251    try {
252      TableName tableName = TableName.valueOf(name.getMethodName());
253      createTable(tableName, true, BloomType.ROW, configuration);
254      writeData(tableName, true);
255      Scan scan = new Scan();
256      scan.withStartRow(ROW2, true);
257      scan.withStopRow(ROW4, true);
258      scan.setScanMetricsEnabled(true);
259      // Set caching to 1 so that one row is read via PREAD and other via STREAM
260      scan.setCaching(1);
261      ScanMetrics scanMetrics = null;
262      StoreScanner.instrument();
263      try (Table table = UTIL.getConnection().getTable(tableName);
264        ResultScanner scanner = table.getScanner(scan)) {
265        int rowCount = 0;
266        Assert.assertFalse(StoreScanner.hasSwitchedToStreamRead());
267        for (Result r : scanner) {
268          rowCount++;
269        }
270        Assert.assertTrue(StoreScanner.hasSwitchedToStreamRead());
271        Assert.assertEquals(2, rowCount);
272        scanMetrics = scanner.getScanMetrics();
273      }
274      int bytesReadFromFs = getBytesReadFromFsForNonGetScan(tableName, scanMetrics, 2);
275      Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get());
276      Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
277      Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
278      // There are 2 HFiles so, 1 read op per HFile was done by actual scan to read data block.
279      // No bloom blocks will be read as this is non Get scan and only bloom filter type is ROW.
280      Assert.assertEquals(2, scanMetrics.blockReadOpsCount.get());
281      // With scan caching set to 1 and 2 rows being scanned, 2 RPC calls will be needed.
282      Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
283    } finally {
284      UTIL.shutdownMiniCluster();
285    }
286  }
287
288  @Test
289  public void testBytesReadWhenFlushHappenedInTheMiddleOfScan() throws Exception {
290    UTIL.startMiniCluster();
291    try {
292      TableName tableName = TableName.valueOf(name.getMethodName());
293      createTable(tableName, true, BloomType.ROW);
294      writeData(tableName, false);
295      Scan scan = new Scan();
296      scan.withStartRow(ROW2, true);
297      scan.withStopRow(ROW4, true);
298      scan.setScanMetricsEnabled(true);
299      // Set caching to 1 so that one row is read per RPC call
300      scan.setCaching(1);
301      // Set max result size to 2 bytes so that both the rows are not read into scanner cache before
302      // even first call to scanner.next()
303      scan.setMaxResultSize(2);
304      ScanMetrics scanMetrics = null;
305      try (Table table = UTIL.getConnection().getTable(tableName);
306        ResultScanner scanner = table.getScanner(scan)) {
307        flushAndWaitUntilFlushed(tableName, true);
308        int rowCount = 0;
309        for (Result r : scanner) {
310          rowCount++;
311        }
312        Assert.assertEquals(2, rowCount);
313        scanMetrics = scanner.getScanMetrics();
314      }
315
316      // Only 1 HFile will be created and it will have only one data block.
317      int bytesReadFromFs = getBytesReadFromFsForNonGetScan(tableName, scanMetrics, 1);
318      Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get());
319
320      Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
321
322      // Flush happens after first row is returned from server but before second row is returned.
323      // So, 2 cells will be read from memstore i.e. the cell for the first row and the next cell
324      // at which scanning will stop. Per row we have 1 cell.
325      int bytesReadFromMemstore =
326        Segment.getCellLength(new KeyValue(ROW2, CF, CQ, HConstants.LATEST_TIMESTAMP, VALUE));
327      Assert.assertEquals(2 * bytesReadFromMemstore, scanMetrics.bytesReadFromMemstore.get());
328
329      // There will be 1 read op to read the only data block present in the HFile.
330      Assert.assertEquals(1, scanMetrics.blockReadOpsCount.get());
331
332      // More than 1 RPC call should be there
333      Assert.assertEquals(3, scanMetrics.countOfRPCcalls.get());
334    } finally {
335      UTIL.shutdownMiniCluster();
336    }
337  }
338
339  @Test
340  public void testBytesReadInReverseScan() throws Exception {
341    UTIL.startMiniCluster();
342    try {
343      TableName tableName = TableName.valueOf(name.getMethodName());
344      createTable(tableName, true, BloomType.ROW);
345      writeData(tableName, true);
346      Scan scan = new Scan();
347      scan.withStartRow(ROW4, true);
348      scan.withStopRow(ROW2, true);
349      scan.setScanMetricsEnabled(true);
350      scan.setReversed(true);
351      // Set caching to 1 so that one row is read per RPC call
352      scan.setCaching(1);
353      ScanMetrics scanMetrics = null;
354      try (Table table = UTIL.getConnection().getTable(tableName);
355        ResultScanner scanner = table.getScanner(scan)) {
356        int rowCount = 0;
357        for (Result r : scanner) {
358          rowCount++;
359        }
360        Assert.assertEquals(2, rowCount);
361        scanMetrics = scanner.getScanMetrics();
362        System.out.println("Scan metrics: " + scanMetrics.toString());
363      }
364
365      // 1 data block per HFile was read.
366      int bytesReadFromFs = getBytesReadFromFsForNonGetScan(tableName, scanMetrics, 2);
367      Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get());
368
369      // For the HFile containing both the rows, the data block will be read from block cache when
370      // KeyValueHeap.next() will be called to read the second row.
371      // KeyValueHeap.next() will call StoreFileScanner.next() when on ROW4 which is last row of the
372      // file causing curBlock to be set to null in underlying HFileScanner. As curBlock is null,
373      // kvNext will be null and call to StoreFileScanner.seekToPreviousRow() will be made. As the
374      // curBlock of HFileScanner is null so, StoreFileScanner.seekToPreviousRow() will load data
375      // block from BlockCache. So, 1 data block will be read from block cache.
376      Assert.assertEquals(bytesReadFromFs / 2, scanMetrics.bytesReadFromBlockCache.get());
377      Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
378
379      // 1 read op per HFile was done by actual scan to read data block.
380      Assert.assertEquals(2, scanMetrics.blockReadOpsCount.get());
381
382      // 2 RPC calls will be there
383      Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
384    } finally {
385      UTIL.shutdownMiniCluster();
386    }
387  }
388
389  @Test
390  public void testBytesReadWithLazySeek() throws Exception {
391    UTIL.startMiniCluster();
392    try {
393      TableName tableName = TableName.valueOf(name.getMethodName());
394      createTable(tableName, true, BloomType.NONE);
395      writeData(tableName, true);
396      try (Table table = UTIL.getConnection().getTable(tableName)) {
397        byte[] newValue = Bytes.toBytes("new value");
398        // Update the value of ROW2 and let it stay in memstore. Will assert that lazy seek doesn't
399        // lead to seek on the HFile.
400        table.put(new Put(ROW2).addColumn(CF, CQ, newValue));
401        Scan scan = new Scan();
402        scan.withStartRow(ROW2, true);
403        scan.withStopRow(ROW2, true);
404        scan.setScanMetricsEnabled(true);
405        Map<byte[], NavigableSet<byte[]>> familyMap = new HashMap<>();
406        familyMap.put(CF, new TreeSet<>(Bytes.BYTES_COMPARATOR));
407        familyMap.get(CF).add(CQ);
408        scan.setFamilyMap(familyMap);
409        ScanMetrics scanMetrics = null;
410        try (ResultScanner scanner = table.getScanner(scan)) {
411          int rowCount = 0;
412          for (Result r : scanner) {
413            rowCount++;
414            Assert.assertArrayEquals(newValue, r.getValue(CF, CQ));
415          }
416          Assert.assertEquals(1, rowCount);
417          scanMetrics = scanner.getScanMetrics();
418        }
419        // No real seek should be done on the HFile.
420        Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
421        Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
422        Assert.assertEquals(0, scanMetrics.blockReadOpsCount.get());
423
424        // The cell should be coming purely from memstore.
425        int cellSize =
426          Segment.getCellLength(new KeyValue(ROW2, CF, CQ, HConstants.LATEST_TIMESTAMP, newValue));
427        Assert.assertEquals(cellSize, scanMetrics.bytesReadFromMemstore.get());
428        Assert.assertEquals(1, scanMetrics.countOfRPCcalls.get());
429      }
430    } finally {
431      UTIL.shutdownMiniCluster();
432    }
433  }
434
435  /**
436   * Test consecutive calls to RegionScannerImpl.next() to make sure populating scan metrics from
437   * ThreadLocalServerSideScanMetrics is done correctly.
438   */
439  @Test
440  public void testConsecutiveRegionScannerNextCalls() throws Exception {
441    // We will be setting a very small block size so, make sure to set big enough pread max bytes
442    Map<String, String> configuration = new HashMap<>();
443    configuration.put(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, Integer.toString(64 * 1024));
444    UTIL.startMiniCluster();
445    try {
446      TableName tableName = TableName.valueOf(name.getMethodName());
447      // Set the block size to 4 bytes to get 1 row per data block in HFile.
448      createTable(tableName, true, BloomType.NONE, 4, configuration);
449      try (Table table = UTIL.getConnection().getTable(tableName)) {
450        // Add 3 rows to the table.
451        table.put(new Put(ROW2).addColumn(CF, CQ, VALUE));
452        table.put(new Put(ROW3).addColumn(CF, CQ, VALUE));
453        table.put(new Put(ROW4).addColumn(CF, CQ, VALUE));
454
455        ScanMetrics scanMetrics = null;
456
457        // Scan the added rows. The rows should be read from memstore.
458        Scan scan = createScanToReadOneRowAtATimeFromServer(ROW2, ROW3);
459        try (ResultScanner scanner = table.getScanner(scan)) {
460          int rowCount = 0;
461          for (Result r : scanner) {
462            rowCount++;
463          }
464          Assert.assertEquals(2, rowCount);
465          scanMetrics = scanner.getScanMetrics();
466        }
467
468        // Assert that rows were read from only memstore and involved 2 RPC calls.
469        int cellSize =
470          Segment.getCellLength(new KeyValue(ROW2, CF, CQ, HConstants.LATEST_TIMESTAMP, VALUE));
471        Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
472        Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
473        Assert.assertEquals(0, scanMetrics.blockReadOpsCount.get());
474        Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
475        Assert.assertEquals(3 * cellSize, scanMetrics.bytesReadFromMemstore.get());
476
477        // Flush the table and make sure that the rows are read from HFiles.
478        flushAndWaitUntilFlushed(tableName, false);
479        scan = createScanToReadOneRowAtATimeFromServer(ROW2, ROW3);
480        scanMetrics = null;
481        try (ResultScanner scanner = table.getScanner(scan)) {
482          int rowCount = 0;
483          for (Result r : scanner) {
484            rowCount++;
485          }
486          Assert.assertEquals(2, rowCount);
487          scanMetrics = scanner.getScanMetrics();
488        }
489
490        // Assert that rows were read from HFiles and involved 2 RPC calls.
491        int bytesReadFromFs = getBytesReadToReadConsecutiveDataBlocks(tableName, 1, 3, true);
492        Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get());
493        Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
494        Assert.assertEquals(3, scanMetrics.blockReadOpsCount.get());
495        Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
496        Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
497
498        // Make sure that rows are read from Blockcache now.
499        scan = createScanToReadOneRowAtATimeFromServer(ROW2, ROW3);
500        scanMetrics = null;
501        try (ResultScanner scanner = table.getScanner(scan)) {
502          int rowCount = 0;
503          for (Result r : scanner) {
504            rowCount++;
505          }
506          Assert.assertEquals(2, rowCount);
507          scanMetrics = scanner.getScanMetrics();
508        }
509
510        // Assert that rows were read from Blockcache and involved 2 RPC calls.
511        int bytesReadFromBlockCache =
512          getBytesReadToReadConsecutiveDataBlocks(tableName, 1, 3, false);
513        Assert.assertEquals(bytesReadFromBlockCache, scanMetrics.bytesReadFromBlockCache.get());
514        Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
515        Assert.assertEquals(0, scanMetrics.blockReadOpsCount.get());
516        Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
517        Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
518      }
519    } finally {
520      UTIL.shutdownMiniCluster();
521    }
522  }
523
524  private Scan createScanToReadOneRowAtATimeFromServer(byte[] startRow, byte[] stopRow) {
525    Scan scan = new Scan();
526    scan.withStartRow(startRow, true);
527    scan.withStopRow(stopRow, true);
528    scan.setScanMetricsEnabled(true);
529    scan.setCaching(1);
530    return scan;
531  }
532
533  private void flushAndWaitUntilFlushed(TableName tableName, boolean waitForUpdatedReaders)
534    throws Exception {
535    if (waitForUpdatedReaders) {
536      StoreScanner.instrument();
537    }
538    UTIL.flush(tableName);
539    List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
540    Assert.assertEquals(1, regions.size());
541    HRegion region = regions.get(0);
542    HStore store = region.getStore(CF);
543    // In milliseconds
544    int maxWaitTime = 100000;
545    int totalWaitTime = 0;
546    int sleepTime = 10000;
547    while (
548      store.getStorefiles().size() == 0
549        || (waitForUpdatedReaders && !StoreScanner.hasUpdatedReaders())
550    ) {
551      Thread.sleep(sleepTime);
552      totalWaitTime += sleepTime;
553      if (totalWaitTime >= maxWaitTime) {
554        throw new Exception("Store files not flushed after " + maxWaitTime + "ms");
555      }
556    }
557    Assert.assertEquals(1, store.getStorefiles().size());
558  }
559
560  private int getBytesReadToReadConsecutiveDataBlocks(TableName tableName,
561    int expectedStoreFileCount, int expectedDataBlockCount, boolean isReadFromFs) throws Exception {
562    List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
563    Assert.assertEquals(1, regions.size());
564    HRegion region = regions.get(0);
565    HStore store = region.getStore(CF);
566    Collection<HStoreFile> storeFiles = store.getStorefiles();
567    Assert.assertEquals(expectedStoreFileCount, storeFiles.size());
568    int bytesReadFromFs = 0;
569    for (HStoreFile storeFile : storeFiles) {
570      StoreFileReader reader = storeFile.getReader();
571      HFile.Reader hfileReader = reader.getHFileReader();
572      HFileBlock.FSReader blockReader = hfileReader.getUncachedBlockReader();
573      FixedFileTrailer trailer = hfileReader.getTrailer();
574      int dataIndexLevels = trailer.getNumDataIndexLevels();
575      long loadOnOpenDataOffset = trailer.getLoadOnOpenDataOffset();
576      HFileBlock.BlockIterator blockIterator = blockReader.blockRange(0, loadOnOpenDataOffset);
577      HFileBlock block;
578      boolean readNextBlock = false;
579      int blockCount = 0;
580      while ((block = blockIterator.nextBlock()) != null) {
581        blockCount++;
582        bytesReadFromFs += block.getOnDiskSizeWithHeader();
583        if (isReadFromFs && readNextBlock) {
584          // This accounts for savings we get from prefetched header but these saving are only
585          // applicable when reading from FS and not from BlockCache.
586          bytesReadFromFs -= block.headerSize();
587          readNextBlock = false;
588        }
589        if (block.getNextBlockOnDiskSize() > 0) {
590          bytesReadFromFs += block.headerSize();
591          readNextBlock = true;
592        }
593        Assert.assertTrue(block.getBlockType().isData());
594      }
595      blockIterator.freeBlocks();
596      // No intermediate or leaf index blocks are expected.
597      Assert.assertEquals(1, dataIndexLevels);
598      Assert.assertEquals(expectedDataBlockCount, blockCount);
599    }
600    return bytesReadFromFs;
601  }
602
603  private int getBytesReadFromFsForNonGetScan(TableName tableName, ScanMetrics scanMetrics,
604    int expectedStoreFileCount) throws Exception {
605    List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
606    Assert.assertEquals(1, regions.size());
607    HRegion region = regions.get(0);
608    HStore store = region.getStore(CF);
609    Collection<HStoreFile> storeFiles = store.getStorefiles();
610    Assert.assertEquals(expectedStoreFileCount, storeFiles.size());
611    int bytesReadFromFs = 0;
612    for (HStoreFile storeFile : storeFiles) {
613      StoreFileReader reader = storeFile.getReader();
614      HFile.Reader hfileReader = reader.getHFileReader();
615      HFileBlock.FSReader blockReader = hfileReader.getUncachedBlockReader();
616      FixedFileTrailer trailer = hfileReader.getTrailer();
617      int dataIndexLevels = trailer.getNumDataIndexLevels();
618      // Read the first block of the HFile. First block is always expected to be a DATA block and
619      // the HFile is expected to have only one DATA block.
620      HFileBlock block = blockReader.readBlockData(0, -1, true, true, true);
621      Assert.assertTrue(block.getBlockType().isData());
622      bytesReadFromFs += block.getOnDiskSizeWithHeader();
623      if (block.getNextBlockOnDiskSize() > 0) {
624        bytesReadFromFs += block.headerSize();
625      }
626      block.release();
627      // Each of the HFiles is expected to have only root index but no intermediate or leaf index
628      // blocks.
629      Assert.assertEquals(1, dataIndexLevels);
630    }
631    return bytesReadFromFs;
632  }
633
634  private ScanMetrics readDataAndGetScanMetrics(TableName tableName, boolean isScanMetricsEnabled)
635    throws Exception {
636    Scan scan = new Scan();
637    scan.withStartRow(ROW2, true);
638    scan.withStopRow(ROW2, true);
639    scan.setScanMetricsEnabled(isScanMetricsEnabled);
640    ScanMetrics scanMetrics;
641    try (Table table = UTIL.getConnection().getTable(tableName);
642      ResultScanner scanner = table.getScanner(scan)) {
643      int rowCount = 0;
644      StoreFileScanner.instrument();
645      for (Result r : scanner) {
646        rowCount++;
647      }
648      Assert.assertEquals(1, rowCount);
649      scanMetrics = scanner.getScanMetrics();
650    }
651    if (isScanMetricsEnabled) {
652      LOG.info("Bytes read from fs: " + scanMetrics.bytesReadFromFs.get());
653      LOG.info("Bytes read from block cache: " + scanMetrics.bytesReadFromBlockCache.get());
654      LOG.info("Bytes read from memstore: " + scanMetrics.bytesReadFromMemstore.get());
655      LOG.info("Count of bytes scanned: " + scanMetrics.countOfBlockBytesScanned.get());
656      LOG.info("StoreFileScanners seek count: " + StoreFileScanner.getSeekCount());
657    }
658    return scanMetrics;
659  }
660
661  private void writeData(TableName tableName, boolean shouldFlush) throws Exception {
662    try (Table table = UTIL.getConnection().getTable(tableName)) {
663      table.put(new Put(ROW2).addColumn(CF, CQ, VALUE));
664      table.put(new Put(ROW4).addColumn(CF, CQ, VALUE));
665      if (shouldFlush) {
666        // Create a HFile
667        UTIL.flush(tableName);
668      }
669
670      table.put(new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, VALUE));
671      table.put(new Put(Bytes.toBytes("row5")).addColumn(CF, CQ, VALUE));
672      if (shouldFlush) {
673        // Create a HFile
674        UTIL.flush(tableName);
675      }
676    }
677  }
678
679  private void createTable(TableName tableName, boolean blockCacheEnabled, BloomType bloomType)
680    throws Exception {
681    createTable(tableName, blockCacheEnabled, bloomType, HConstants.DEFAULT_BLOCKSIZE,
682      new HashMap<>());
683  }
684
685  private void createTable(TableName tableName, boolean blockCacheEnabled, BloomType bloomType,
686    Map<String, String> configuration) throws Exception {
687    createTable(tableName, blockCacheEnabled, bloomType, HConstants.DEFAULT_BLOCKSIZE,
688      configuration);
689  }
690
691  private void createTable(TableName tableName, boolean blockCacheEnabled, BloomType bloomType,
692    int blocksize, Map<String, String> configuration) throws Exception {
693    Admin admin = UTIL.getAdmin();
694    TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);
695    ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
696      ColumnFamilyDescriptorBuilder.newBuilder(CF);
697    columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
698    columnFamilyDescriptorBuilder.setBlockCacheEnabled(blockCacheEnabled);
699    columnFamilyDescriptorBuilder.setBlocksize(blocksize);
700    for (Map.Entry<String, String> entry : configuration.entrySet()) {
701      columnFamilyDescriptorBuilder.setConfiguration(entry.getKey(), entry.getValue());
702    }
703    tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
704    admin.createTable(tableDescriptorBuilder.build());
705    UTIL.waitUntilAllRegionsAssigned(tableName);
706  }
707
708  private void assertBytesReadFromFs(TableName tableName, long actualBytesReadFromFs,
709    KeyValue keyValue, long actualReadOps) throws Exception {
710    List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
711    Assert.assertEquals(1, regions.size());
712    MutableInt totalExpectedBytesReadFromFs = new MutableInt(0);
713    MutableInt totalExpectedReadOps = new MutableInt(0);
714    for (HRegion region : regions) {
715      Assert.assertNull(region.getBlockCache());
716      HStore store = region.getStore(CF);
717      Collection<HStoreFile> storeFiles = store.getStorefiles();
718      Assert.assertEquals(2, storeFiles.size());
719      for (HStoreFile storeFile : storeFiles) {
720        StoreFileReader reader = storeFile.getReader();
721        HFile.Reader hfileReader = reader.getHFileReader();
722        BloomFilter bloomFilter = reader.getGeneralBloomFilter();
723        Assert.assertTrue(bloomFilter == null || bloomFilter instanceof CompoundBloomFilter);
724        CompoundBloomFilter cbf = bloomFilter == null ? null : (CompoundBloomFilter) bloomFilter;
725        Consumer<HFileBlock> bytesReadFunction = new Consumer<HFileBlock>() {
726          @Override
727          public void accept(HFileBlock block) {
728            totalExpectedBytesReadFromFs.add(block.getOnDiskSizeWithHeader());
729            if (block.getNextBlockOnDiskSize() > 0) {
730              totalExpectedBytesReadFromFs.add(block.headerSize());
731            }
732            totalExpectedReadOps.add(1);
733          }
734        };
735        readHFile(hfileReader, cbf, keyValue, bytesReadFunction);
736      }
737    }
738    Assert.assertEquals(totalExpectedBytesReadFromFs.longValue(), actualBytesReadFromFs);
739    Assert.assertEquals(totalExpectedReadOps.longValue(), actualReadOps);
740  }
741
742  private void readHFile(HFile.Reader hfileReader, CompoundBloomFilter cbf, KeyValue keyValue,
743    Consumer<HFileBlock> bytesReadFunction) throws Exception {
744    HFileBlock.FSReader blockReader = hfileReader.getUncachedBlockReader();
745    FixedFileTrailer trailer = hfileReader.getTrailer();
746    HFileContext meta = hfileReader.getFileContext();
747    long fileSize = hfileReader.length();
748
749    // Read the bloom block from FS
750    if (cbf != null) {
751      // Read a block in load-on-open section to make sure prefetched header is not bloom
752      // block's header
753      blockReader.readBlockData(trailer.getLoadOnOpenDataOffset(), -1, true, true, true).release();
754
755      HFileBlockIndex.BlockIndexReader index = cbf.getBloomIndex();
756      byte[] row = ROW2;
757      int blockIndex = index.rootBlockContainingKey(row, 0, row.length);
758      HFileBlock bloomBlock = cbf.getBloomBlock(blockIndex);
759      boolean fileContainsKey = BloomFilterUtil.contains(row, 0, row.length,
760        bloomBlock.getBufferReadOnly(), bloomBlock.headerSize(),
761        bloomBlock.getUncompressedSizeWithoutHeader(), cbf.getHash(), cbf.getHashCount());
762      bytesReadFunction.accept(bloomBlock);
763      // Asser that the block read is a bloom block
764      Assert.assertEquals(bloomBlock.getBlockType(), BlockType.BLOOM_CHUNK);
765      bloomBlock.release();
766      if (!fileContainsKey) {
767        // Key is not in th file, so we don't need to read the data block
768        return;
769      }
770    }
771
772    // Indexes use NoOpEncodedSeeker
773    MyNoOpEncodedSeeker seeker = new MyNoOpEncodedSeeker();
774    HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
775      fileSize - trailer.getTrailerSize());
776    HFileBlock block = blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX);
777
778    // Comparator class name is stored in the trailer in version 3.
779    CellComparator comparator = trailer.createComparator();
780    // Initialize the seeker
781    seeker.initRootIndex(block, trailer.getDataIndexCount(), comparator,
782      trailer.getNumDataIndexLevels());
783
784    int blockLevelsRead = 1; // Root index is the first level
785
786    int rootLevIndex = seeker.rootBlockContainingKey(keyValue);
787    long currentOffset = seeker.getBlockOffset(rootLevIndex);
788    int currentDataSize = seeker.getBlockDataSize(rootLevIndex);
789
790    HFileBlock prevBlock = null;
791    do {
792      prevBlock = block;
793      block = blockReader.readBlockData(currentOffset, currentDataSize, true, true, true);
794      HFileBlock unpacked = block.unpack(meta, blockReader);
795      if (unpacked != block) {
796        block.release();
797        block = unpacked;
798      }
799      bytesReadFunction.accept(block);
800      if (!block.getBlockType().isData()) {
801        ByteBuff buffer = block.getBufferWithoutHeader();
802        // Place the buffer at the correct position
803        HFileBlockIndex.BlockIndexReader.locateNonRootIndexEntry(buffer, keyValue, comparator);
804        currentOffset = buffer.getLong();
805        currentDataSize = buffer.getInt();
806      }
807      prevBlock.release();
808      blockLevelsRead++;
809    } while (!block.getBlockType().isData());
810    block.release();
811    blockIter.freeBlocks();
812
813    Assert.assertEquals(blockLevelsRead, trailer.getNumDataIndexLevels() + 1);
814  }
815
816  private void assertBytesReadFromBlockCache(TableName tableName,
817    long actualBytesReadFromBlockCache, KeyValue keyValue) throws Exception {
818    List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
819    Assert.assertEquals(1, regions.size());
820    MutableInt totalExpectedBytesReadFromBlockCache = new MutableInt(0);
821    for (HRegion region : regions) {
822      Assert.assertNotNull(region.getBlockCache());
823      HStore store = region.getStore(CF);
824      Collection<HStoreFile> storeFiles = store.getStorefiles();
825      Assert.assertEquals(2, storeFiles.size());
826      for (HStoreFile storeFile : storeFiles) {
827        StoreFileReader reader = storeFile.getReader();
828        HFile.Reader hfileReader = reader.getHFileReader();
829        BloomFilter bloomFilter = reader.getGeneralBloomFilter();
830        Assert.assertTrue(bloomFilter == null || bloomFilter instanceof CompoundBloomFilter);
831        CompoundBloomFilter cbf = bloomFilter == null ? null : (CompoundBloomFilter) bloomFilter;
832        Consumer<HFileBlock> bytesReadFunction = new Consumer<HFileBlock>() {
833          @Override
834          public void accept(HFileBlock block) {
835            totalExpectedBytesReadFromBlockCache.add(block.getOnDiskSizeWithHeader());
836            if (block.getNextBlockOnDiskSize() > 0) {
837              totalExpectedBytesReadFromBlockCache.add(block.headerSize());
838            }
839          }
840        };
841        readHFile(hfileReader, cbf, keyValue, bytesReadFunction);
842      }
843    }
844    Assert.assertEquals(totalExpectedBytesReadFromBlockCache.longValue(),
845      actualBytesReadFromBlockCache);
846  }
847
848  private void assertBlockCacheWarmUp(TableName tableName, KeyValue keyValue) throws Exception {
849    List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
850    Assert.assertEquals(1, regions.size());
851    for (HRegion region : regions) {
852      Assert.assertNotNull(region.getBlockCache());
853      HStore store = region.getStore(CF);
854      Collection<HStoreFile> storeFiles = store.getStorefiles();
855      Assert.assertEquals(2, storeFiles.size());
856      for (HStoreFile storeFile : storeFiles) {
857        StoreFileReader reader = storeFile.getReader();
858        HFile.Reader hfileReader = reader.getHFileReader();
859        BloomFilter bloomFilter = reader.getGeneralBloomFilter();
860        Assert.assertTrue(bloomFilter == null || bloomFilter instanceof CompoundBloomFilter);
861        CompoundBloomFilter cbf = bloomFilter == null ? null : (CompoundBloomFilter) bloomFilter;
862        Consumer<HFileBlock> bytesReadFunction = new Consumer<HFileBlock>() {
863          @Override
864          public void accept(HFileBlock block) {
865            assertBlockIsCached(hfileReader, block, region.getBlockCache());
866          }
867        };
868        readHFile(hfileReader, cbf, keyValue, bytesReadFunction);
869      }
870    }
871  }
872
873  private void assertBlockIsCached(HFile.Reader hfileReader, HFileBlock block,
874    BlockCache blockCache) {
875    if (blockCache == null) {
876      return;
877    }
878    Path path = hfileReader.getPath();
879    BlockCacheKey key = new BlockCacheKey(path, block.getOffset(), true, block.getBlockType());
880    HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(key, true, false, true);
881    Assert.assertNotNull(cachedBlock);
882    Assert.assertEquals(block.getOnDiskSizeWithHeader(), cachedBlock.getOnDiskSizeWithHeader());
883    Assert.assertEquals(block.getNextBlockOnDiskSize(), cachedBlock.getNextBlockOnDiskSize());
884    cachedBlock.release();
885  }
886
887  private static class MyNoOpEncodedSeeker extends NoOpIndexBlockEncoder.NoOpEncodedSeeker {
888    public long getBlockOffset(int i) {
889      return blockOffsets[i];
890    }
891
892    public int getBlockDataSize(int i) {
893      return blockDataSizes[i];
894    }
895  }
896}