001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.NavigableSet; 025import java.util.TreeSet; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.function.Consumer; 028import org.apache.commons.lang3.mutable.MutableInt; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.CellComparator; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtil; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.KeyValue; 036import org.apache.hadoop.hbase.PrivateConstants; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.ResultScanner; 043import org.apache.hadoop.hbase.client.Scan; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 047import org.apache.hadoop.hbase.executor.ExecutorType; 048import org.apache.hadoop.hbase.io.hfile.BlockCache; 049import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 050import org.apache.hadoop.hbase.io.hfile.BlockType; 051import org.apache.hadoop.hbase.io.hfile.CompoundBloomFilter; 052import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer; 053import org.apache.hadoop.hbase.io.hfile.HFile; 054import org.apache.hadoop.hbase.io.hfile.HFileBlock; 055import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex; 056import org.apache.hadoop.hbase.io.hfile.HFileContext; 057import org.apache.hadoop.hbase.io.hfile.LruBlockCache; 058import org.apache.hadoop.hbase.io.hfile.NoOpIndexBlockEncoder; 059import org.apache.hadoop.hbase.nio.ByteBuff; 060import org.apache.hadoop.hbase.testclassification.IOTests; 061import org.apache.hadoop.hbase.testclassification.LargeTests; 062import org.apache.hadoop.hbase.util.BloomFilter; 063import org.apache.hadoop.hbase.util.BloomFilterUtil; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.junit.Assert; 066import org.junit.Before; 067import org.junit.ClassRule; 068import org.junit.Rule; 069import org.junit.Test; 070import org.junit.experimental.categories.Category; 071import org.junit.rules.TestName; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075@Category({ IOTests.class, LargeTests.class }) 076public class TestBytesReadServerSideScanMetrics { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestBytesReadServerSideScanMetrics.class); 081 082 @Rule 083 public TestName name = new TestName(); 084 085 private static final Logger LOG = 086 LoggerFactory.getLogger(TestBytesReadServerSideScanMetrics.class); 087 088 private HBaseTestingUtil UTIL; 089 090 private static final byte[] CF = Bytes.toBytes("cf"); 091 092 private static final byte[] CQ = Bytes.toBytes("cq"); 093 094 private static final byte[] VALUE = Bytes.toBytes("value"); 095 096 private static final byte[] ROW2 = Bytes.toBytes("row2"); 097 private static final byte[] ROW3 = Bytes.toBytes("row3"); 098 private static final byte[] ROW4 = Bytes.toBytes("row4"); 099 100 private Configuration conf; 101 102 @Before 103 public void setUp() throws Exception { 104 UTIL = new HBaseTestingUtil(); 105 conf = UTIL.getConfiguration(); 106 conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0); 107 conf.setBoolean(CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION, false); 108 } 109 110 @Test 111 public void testScanMetricsDisabled() throws Exception { 112 conf.setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 113 UTIL.startMiniCluster(); 114 try { 115 TableName tableName = TableName.valueOf(name.getMethodName()); 116 createTable(tableName, false, BloomType.NONE); 117 writeData(tableName, true); 118 Scan scan = new Scan(); 119 scan.withStartRow(ROW2, true); 120 scan.withStopRow(ROW4, true); 121 scan.setCaching(1); 122 try (Table table = UTIL.getConnection().getTable(tableName); 123 ResultScanner scanner = table.getScanner(scan)) { 124 int rowCount = 0; 125 for (Result r : scanner) { 126 rowCount++; 127 } 128 Assert.assertEquals(2, rowCount); 129 Assert.assertNull(scanner.getScanMetrics()); 130 } 131 } finally { 132 UTIL.shutdownMiniCluster(); 133 } 134 } 135 136 @Test 137 public void testBytesReadFromFsForSerialSeeks() throws Exception { 138 conf.setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 139 UTIL.startMiniCluster(); 140 try { 141 TableName tableName = TableName.valueOf(name.getMethodName()); 142 createTable(tableName, false, BloomType.ROW); 143 writeData(tableName, true); 144 ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true); 145 146 // Use oldest timestamp to make sure the fake key is not less than the first key in 147 // the file containing key: row2 148 KeyValue keyValue = new KeyValue(ROW2, CF, CQ, PrivateConstants.OLDEST_TIMESTAMP, VALUE); 149 assertBytesReadFromFs(tableName, scanMetrics.bytesReadFromFs.get(), keyValue, 150 scanMetrics.blockReadOpsCount.get()); 151 Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get()); 152 Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get()); 153 } finally { 154 UTIL.shutdownMiniCluster(); 155 } 156 } 157 158 @Test 159 public void testBytesReadFromFsForParallelSeeks() throws Exception { 160 conf.setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 161 // This property doesn't work correctly if only applied at column family level. 162 conf.setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, true); 163 UTIL.startMiniCluster(); 164 try { 165 TableName tableName = TableName.valueOf(name.getMethodName()); 166 createTable(tableName, false, BloomType.NONE); 167 writeData(tableName, true); 168 HRegionServer server = UTIL.getMiniHBaseCluster().getRegionServer(0); 169 ThreadPoolExecutor executor = 170 server.getExecutorService().getExecutorThreadPool(ExecutorType.RS_PARALLEL_SEEK); 171 long tasksCompletedBeforeRead = executor.getCompletedTaskCount(); 172 ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true); 173 long tasksCompletedAfterRead = executor.getCompletedTaskCount(); 174 // Assert both of the HFiles were read using parallel seek executor 175 Assert.assertEquals(2, tasksCompletedAfterRead - tasksCompletedBeforeRead); 176 177 // Use oldest timestamp to make sure the fake key is not less than the first key in 178 // the file containing key: row2 179 KeyValue keyValue = new KeyValue(ROW2, CF, CQ, PrivateConstants.OLDEST_TIMESTAMP, VALUE); 180 assertBytesReadFromFs(tableName, scanMetrics.bytesReadFromFs.get(), keyValue, 181 scanMetrics.blockReadOpsCount.get()); 182 Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get()); 183 Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get()); 184 } finally { 185 UTIL.shutdownMiniCluster(); 186 } 187 } 188 189 @Test 190 public void testBytesReadFromBlockCache() throws Exception { 191 UTIL.startMiniCluster(); 192 try { 193 TableName tableName = TableName.valueOf(name.getMethodName()); 194 createTable(tableName, true, BloomType.NONE); 195 HRegionServer server = UTIL.getMiniHBaseCluster().getRegionServer(0); 196 LruBlockCache blockCache = (LruBlockCache) server.getBlockCache().get(); 197 198 // Assert that acceptable size of LRU block cache is greater than 1MB 199 Assert.assertTrue(blockCache.acceptableSize() > 1024 * 1024); 200 writeData(tableName, true); 201 readDataAndGetScanMetrics(tableName, false); 202 KeyValue keyValue = new KeyValue(ROW2, CF, CQ, PrivateConstants.OLDEST_TIMESTAMP, VALUE); 203 assertBlockCacheWarmUp(tableName, keyValue); 204 ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true); 205 Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get()); 206 assertBytesReadFromBlockCache(tableName, scanMetrics.bytesReadFromBlockCache.get(), keyValue); 207 Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get()); 208 } finally { 209 UTIL.shutdownMiniCluster(); 210 } 211 } 212 213 @Test 214 public void testBytesReadFromMemstore() throws Exception { 215 UTIL.startMiniCluster(); 216 try { 217 TableName tableName = TableName.valueOf(name.getMethodName()); 218 createTable(tableName, false, BloomType.NONE); 219 writeData(tableName, false); 220 ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true); 221 222 // Assert no flush has happened for the table 223 List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName); 224 for (HRegion region : regions) { 225 HStore store = region.getStore(CF); 226 // Assert no HFile is there 227 Assert.assertEquals(0, store.getStorefiles().size()); 228 } 229 230 KeyValue keyValue = new KeyValue(ROW2, CF, CQ, HConstants.LATEST_TIMESTAMP, VALUE); 231 int singleKeyValueSize = Segment.getCellLength(keyValue); 232 // First key value will be read on doing seek and second one on doing next() to determine 233 // there are no more cells in the row. We don't count key values read on SegmentScanner 234 // instance creation. 235 int totalKeyValueSize = 2 * singleKeyValueSize; 236 Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get()); 237 Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get()); 238 Assert.assertEquals(totalKeyValueSize, scanMetrics.bytesReadFromMemstore.get()); 239 } finally { 240 UTIL.shutdownMiniCluster(); 241 } 242 } 243 244 @Test 245 public void testBytesReadWithSwitchFromPReadToStream() throws Exception { 246 // Set pread max bytes to 3 to make sure that the first row is read using pread and the second 247 // one using stream read 248 Map<String, String> configuration = new HashMap<>(); 249 configuration.put(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, "3"); 250 UTIL.startMiniCluster(); 251 try { 252 TableName tableName = TableName.valueOf(name.getMethodName()); 253 createTable(tableName, true, BloomType.ROW, configuration); 254 writeData(tableName, true); 255 Scan scan = new Scan(); 256 scan.withStartRow(ROW2, true); 257 scan.withStopRow(ROW4, true); 258 scan.setScanMetricsEnabled(true); 259 // Set caching to 1 so that one row is read via PREAD and other via STREAM 260 scan.setCaching(1); 261 ScanMetrics scanMetrics = null; 262 StoreScanner.instrument(); 263 try (Table table = UTIL.getConnection().getTable(tableName); 264 ResultScanner scanner = table.getScanner(scan)) { 265 int rowCount = 0; 266 Assert.assertFalse(StoreScanner.hasSwitchedToStreamRead()); 267 for (Result r : scanner) { 268 rowCount++; 269 } 270 Assert.assertTrue(StoreScanner.hasSwitchedToStreamRead()); 271 Assert.assertEquals(2, rowCount); 272 scanMetrics = scanner.getScanMetrics(); 273 } 274 int bytesReadFromFs = getBytesReadFromFsForNonGetScan(tableName, scanMetrics, 2); 275 Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get()); 276 Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get()); 277 Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get()); 278 // There are 2 HFiles so, 1 read op per HFile was done by actual scan to read data block. 279 // No bloom blocks will be read as this is non Get scan and only bloom filter type is ROW. 280 Assert.assertEquals(2, scanMetrics.blockReadOpsCount.get()); 281 // With scan caching set to 1 and 2 rows being scanned, 2 RPC calls will be needed. 282 Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get()); 283 } finally { 284 UTIL.shutdownMiniCluster(); 285 } 286 } 287 288 @Test 289 public void testBytesReadWhenFlushHappenedInTheMiddleOfScan() throws Exception { 290 UTIL.startMiniCluster(); 291 try { 292 TableName tableName = TableName.valueOf(name.getMethodName()); 293 createTable(tableName, true, BloomType.ROW); 294 writeData(tableName, false); 295 Scan scan = new Scan(); 296 scan.withStartRow(ROW2, true); 297 scan.withStopRow(ROW4, true); 298 scan.setScanMetricsEnabled(true); 299 // Set caching to 1 so that one row is read per RPC call 300 scan.setCaching(1); 301 // Set max result size to 2 bytes so that both the rows are not read into scanner cache before 302 // even first call to scanner.next() 303 scan.setMaxResultSize(2); 304 ScanMetrics scanMetrics = null; 305 try (Table table = UTIL.getConnection().getTable(tableName); 306 ResultScanner scanner = table.getScanner(scan)) { 307 flushAndWaitUntilFlushed(tableName, true); 308 int rowCount = 0; 309 for (Result r : scanner) { 310 rowCount++; 311 } 312 Assert.assertEquals(2, rowCount); 313 scanMetrics = scanner.getScanMetrics(); 314 } 315 316 // Only 1 HFile will be created and it will have only one data block. 317 int bytesReadFromFs = getBytesReadFromFsForNonGetScan(tableName, scanMetrics, 1); 318 Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get()); 319 320 Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get()); 321 322 // Flush happens after first row is returned from server but before second row is returned. 323 // So, 2 cells will be read from memstore i.e. the cell for the first row and the next cell 324 // at which scanning will stop. Per row we have 1 cell. 325 int bytesReadFromMemstore = 326 Segment.getCellLength(new KeyValue(ROW2, CF, CQ, HConstants.LATEST_TIMESTAMP, VALUE)); 327 Assert.assertEquals(2 * bytesReadFromMemstore, scanMetrics.bytesReadFromMemstore.get()); 328 329 // There will be 1 read op to read the only data block present in the HFile. 330 Assert.assertEquals(1, scanMetrics.blockReadOpsCount.get()); 331 332 // More than 1 RPC call should be there 333 Assert.assertEquals(3, scanMetrics.countOfRPCcalls.get()); 334 } finally { 335 UTIL.shutdownMiniCluster(); 336 } 337 } 338 339 @Test 340 public void testBytesReadInReverseScan() throws Exception { 341 UTIL.startMiniCluster(); 342 try { 343 TableName tableName = TableName.valueOf(name.getMethodName()); 344 createTable(tableName, true, BloomType.ROW); 345 writeData(tableName, true); 346 Scan scan = new Scan(); 347 scan.withStartRow(ROW4, true); 348 scan.withStopRow(ROW2, true); 349 scan.setScanMetricsEnabled(true); 350 scan.setReversed(true); 351 // Set caching to 1 so that one row is read per RPC call 352 scan.setCaching(1); 353 ScanMetrics scanMetrics = null; 354 try (Table table = UTIL.getConnection().getTable(tableName); 355 ResultScanner scanner = table.getScanner(scan)) { 356 int rowCount = 0; 357 for (Result r : scanner) { 358 rowCount++; 359 } 360 Assert.assertEquals(2, rowCount); 361 scanMetrics = scanner.getScanMetrics(); 362 System.out.println("Scan metrics: " + scanMetrics.toString()); 363 } 364 365 // 1 data block per HFile was read. 366 int bytesReadFromFs = getBytesReadFromFsForNonGetScan(tableName, scanMetrics, 2); 367 Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get()); 368 369 // For the HFile containing both the rows, the data block will be read from block cache when 370 // KeyValueHeap.next() will be called to read the second row. 371 // KeyValueHeap.next() will call StoreFileScanner.next() when on ROW4 which is last row of the 372 // file causing curBlock to be set to null in underlying HFileScanner. As curBlock is null, 373 // kvNext will be null and call to StoreFileScanner.seekToPreviousRow() will be made. As the 374 // curBlock of HFileScanner is null so, StoreFileScanner.seekToPreviousRow() will load data 375 // block from BlockCache. So, 1 data block will be read from block cache. 376 Assert.assertEquals(bytesReadFromFs / 2, scanMetrics.bytesReadFromBlockCache.get()); 377 Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get()); 378 379 // 1 read op per HFile was done by actual scan to read data block. 380 Assert.assertEquals(2, scanMetrics.blockReadOpsCount.get()); 381 382 // 2 RPC calls will be there 383 Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get()); 384 } finally { 385 UTIL.shutdownMiniCluster(); 386 } 387 } 388 389 @Test 390 public void testBytesReadWithLazySeek() throws Exception { 391 UTIL.startMiniCluster(); 392 try { 393 TableName tableName = TableName.valueOf(name.getMethodName()); 394 createTable(tableName, true, BloomType.NONE); 395 writeData(tableName, true); 396 try (Table table = UTIL.getConnection().getTable(tableName)) { 397 byte[] newValue = Bytes.toBytes("new value"); 398 // Update the value of ROW2 and let it stay in memstore. Will assert that lazy seek doesn't 399 // lead to seek on the HFile. 400 table.put(new Put(ROW2).addColumn(CF, CQ, newValue)); 401 Scan scan = new Scan(); 402 scan.withStartRow(ROW2, true); 403 scan.withStopRow(ROW2, true); 404 scan.setScanMetricsEnabled(true); 405 Map<byte[], NavigableSet<byte[]>> familyMap = new HashMap<>(); 406 familyMap.put(CF, new TreeSet<>(Bytes.BYTES_COMPARATOR)); 407 familyMap.get(CF).add(CQ); 408 scan.setFamilyMap(familyMap); 409 ScanMetrics scanMetrics = null; 410 try (ResultScanner scanner = table.getScanner(scan)) { 411 int rowCount = 0; 412 for (Result r : scanner) { 413 rowCount++; 414 Assert.assertArrayEquals(newValue, r.getValue(CF, CQ)); 415 } 416 Assert.assertEquals(1, rowCount); 417 scanMetrics = scanner.getScanMetrics(); 418 } 419 // No real seek should be done on the HFile. 420 Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get()); 421 Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get()); 422 Assert.assertEquals(0, scanMetrics.blockReadOpsCount.get()); 423 424 // The cell should be coming purely from memstore. 425 int cellSize = 426 Segment.getCellLength(new KeyValue(ROW2, CF, CQ, HConstants.LATEST_TIMESTAMP, newValue)); 427 Assert.assertEquals(cellSize, scanMetrics.bytesReadFromMemstore.get()); 428 Assert.assertEquals(1, scanMetrics.countOfRPCcalls.get()); 429 } 430 } finally { 431 UTIL.shutdownMiniCluster(); 432 } 433 } 434 435 /** 436 * Test consecutive calls to RegionScannerImpl.next() to make sure populating scan metrics from 437 * ThreadLocalServerSideScanMetrics is done correctly. 438 */ 439 @Test 440 public void testConsecutiveRegionScannerNextCalls() throws Exception { 441 // We will be setting a very small block size so, make sure to set big enough pread max bytes 442 Map<String, String> configuration = new HashMap<>(); 443 configuration.put(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, Integer.toString(64 * 1024)); 444 UTIL.startMiniCluster(); 445 try { 446 TableName tableName = TableName.valueOf(name.getMethodName()); 447 // Set the block size to 4 bytes to get 1 row per data block in HFile. 448 createTable(tableName, true, BloomType.NONE, 4, configuration); 449 try (Table table = UTIL.getConnection().getTable(tableName)) { 450 // Add 3 rows to the table. 451 table.put(new Put(ROW2).addColumn(CF, CQ, VALUE)); 452 table.put(new Put(ROW3).addColumn(CF, CQ, VALUE)); 453 table.put(new Put(ROW4).addColumn(CF, CQ, VALUE)); 454 455 ScanMetrics scanMetrics = null; 456 457 // Scan the added rows. The rows should be read from memstore. 458 Scan scan = createScanToReadOneRowAtATimeFromServer(ROW2, ROW3); 459 try (ResultScanner scanner = table.getScanner(scan)) { 460 int rowCount = 0; 461 for (Result r : scanner) { 462 rowCount++; 463 } 464 Assert.assertEquals(2, rowCount); 465 scanMetrics = scanner.getScanMetrics(); 466 } 467 468 // Assert that rows were read from only memstore and involved 2 RPC calls. 469 int cellSize = 470 Segment.getCellLength(new KeyValue(ROW2, CF, CQ, HConstants.LATEST_TIMESTAMP, VALUE)); 471 Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get()); 472 Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get()); 473 Assert.assertEquals(0, scanMetrics.blockReadOpsCount.get()); 474 Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get()); 475 Assert.assertEquals(3 * cellSize, scanMetrics.bytesReadFromMemstore.get()); 476 477 // Flush the table and make sure that the rows are read from HFiles. 478 flushAndWaitUntilFlushed(tableName, false); 479 scan = createScanToReadOneRowAtATimeFromServer(ROW2, ROW3); 480 scanMetrics = null; 481 try (ResultScanner scanner = table.getScanner(scan)) { 482 int rowCount = 0; 483 for (Result r : scanner) { 484 rowCount++; 485 } 486 Assert.assertEquals(2, rowCount); 487 scanMetrics = scanner.getScanMetrics(); 488 } 489 490 // Assert that rows were read from HFiles and involved 2 RPC calls. 491 int bytesReadFromFs = getBytesReadToReadConsecutiveDataBlocks(tableName, 1, 3, true); 492 Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get()); 493 Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get()); 494 Assert.assertEquals(3, scanMetrics.blockReadOpsCount.get()); 495 Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get()); 496 Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get()); 497 498 // Make sure that rows are read from Blockcache now. 499 scan = createScanToReadOneRowAtATimeFromServer(ROW2, ROW3); 500 scanMetrics = null; 501 try (ResultScanner scanner = table.getScanner(scan)) { 502 int rowCount = 0; 503 for (Result r : scanner) { 504 rowCount++; 505 } 506 Assert.assertEquals(2, rowCount); 507 scanMetrics = scanner.getScanMetrics(); 508 } 509 510 // Assert that rows were read from Blockcache and involved 2 RPC calls. 511 int bytesReadFromBlockCache = 512 getBytesReadToReadConsecutiveDataBlocks(tableName, 1, 3, false); 513 Assert.assertEquals(bytesReadFromBlockCache, scanMetrics.bytesReadFromBlockCache.get()); 514 Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get()); 515 Assert.assertEquals(0, scanMetrics.blockReadOpsCount.get()); 516 Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get()); 517 Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get()); 518 } 519 } finally { 520 UTIL.shutdownMiniCluster(); 521 } 522 } 523 524 private Scan createScanToReadOneRowAtATimeFromServer(byte[] startRow, byte[] stopRow) { 525 Scan scan = new Scan(); 526 scan.withStartRow(startRow, true); 527 scan.withStopRow(stopRow, true); 528 scan.setScanMetricsEnabled(true); 529 scan.setCaching(1); 530 return scan; 531 } 532 533 private void flushAndWaitUntilFlushed(TableName tableName, boolean waitForUpdatedReaders) 534 throws Exception { 535 if (waitForUpdatedReaders) { 536 StoreScanner.instrument(); 537 } 538 UTIL.flush(tableName); 539 List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName); 540 Assert.assertEquals(1, regions.size()); 541 HRegion region = regions.get(0); 542 HStore store = region.getStore(CF); 543 // In milliseconds 544 int maxWaitTime = 100000; 545 int totalWaitTime = 0; 546 int sleepTime = 10000; 547 while ( 548 store.getStorefiles().size() == 0 549 || (waitForUpdatedReaders && !StoreScanner.hasUpdatedReaders()) 550 ) { 551 Thread.sleep(sleepTime); 552 totalWaitTime += sleepTime; 553 if (totalWaitTime >= maxWaitTime) { 554 throw new Exception("Store files not flushed after " + maxWaitTime + "ms"); 555 } 556 } 557 Assert.assertEquals(1, store.getStorefiles().size()); 558 } 559 560 private int getBytesReadToReadConsecutiveDataBlocks(TableName tableName, 561 int expectedStoreFileCount, int expectedDataBlockCount, boolean isReadFromFs) throws Exception { 562 List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName); 563 Assert.assertEquals(1, regions.size()); 564 HRegion region = regions.get(0); 565 HStore store = region.getStore(CF); 566 Collection<HStoreFile> storeFiles = store.getStorefiles(); 567 Assert.assertEquals(expectedStoreFileCount, storeFiles.size()); 568 int bytesReadFromFs = 0; 569 for (HStoreFile storeFile : storeFiles) { 570 StoreFileReader reader = storeFile.getReader(); 571 HFile.Reader hfileReader = reader.getHFileReader(); 572 HFileBlock.FSReader blockReader = hfileReader.getUncachedBlockReader(); 573 FixedFileTrailer trailer = hfileReader.getTrailer(); 574 int dataIndexLevels = trailer.getNumDataIndexLevels(); 575 long loadOnOpenDataOffset = trailer.getLoadOnOpenDataOffset(); 576 HFileBlock.BlockIterator blockIterator = blockReader.blockRange(0, loadOnOpenDataOffset); 577 HFileBlock block; 578 boolean readNextBlock = false; 579 int blockCount = 0; 580 while ((block = blockIterator.nextBlock()) != null) { 581 blockCount++; 582 bytesReadFromFs += block.getOnDiskSizeWithHeader(); 583 if (isReadFromFs && readNextBlock) { 584 // This accounts for savings we get from prefetched header but these saving are only 585 // applicable when reading from FS and not from BlockCache. 586 bytesReadFromFs -= block.headerSize(); 587 readNextBlock = false; 588 } 589 if (block.getNextBlockOnDiskSize() > 0) { 590 bytesReadFromFs += block.headerSize(); 591 readNextBlock = true; 592 } 593 Assert.assertTrue(block.getBlockType().isData()); 594 } 595 blockIterator.freeBlocks(); 596 // No intermediate or leaf index blocks are expected. 597 Assert.assertEquals(1, dataIndexLevels); 598 Assert.assertEquals(expectedDataBlockCount, blockCount); 599 } 600 return bytesReadFromFs; 601 } 602 603 private int getBytesReadFromFsForNonGetScan(TableName tableName, ScanMetrics scanMetrics, 604 int expectedStoreFileCount) throws Exception { 605 List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName); 606 Assert.assertEquals(1, regions.size()); 607 HRegion region = regions.get(0); 608 HStore store = region.getStore(CF); 609 Collection<HStoreFile> storeFiles = store.getStorefiles(); 610 Assert.assertEquals(expectedStoreFileCount, storeFiles.size()); 611 int bytesReadFromFs = 0; 612 for (HStoreFile storeFile : storeFiles) { 613 StoreFileReader reader = storeFile.getReader(); 614 HFile.Reader hfileReader = reader.getHFileReader(); 615 HFileBlock.FSReader blockReader = hfileReader.getUncachedBlockReader(); 616 FixedFileTrailer trailer = hfileReader.getTrailer(); 617 int dataIndexLevels = trailer.getNumDataIndexLevels(); 618 // Read the first block of the HFile. First block is always expected to be a DATA block and 619 // the HFile is expected to have only one DATA block. 620 HFileBlock block = blockReader.readBlockData(0, -1, true, true, true); 621 Assert.assertTrue(block.getBlockType().isData()); 622 bytesReadFromFs += block.getOnDiskSizeWithHeader(); 623 if (block.getNextBlockOnDiskSize() > 0) { 624 bytesReadFromFs += block.headerSize(); 625 } 626 block.release(); 627 // Each of the HFiles is expected to have only root index but no intermediate or leaf index 628 // blocks. 629 Assert.assertEquals(1, dataIndexLevels); 630 } 631 return bytesReadFromFs; 632 } 633 634 private ScanMetrics readDataAndGetScanMetrics(TableName tableName, boolean isScanMetricsEnabled) 635 throws Exception { 636 Scan scan = new Scan(); 637 scan.withStartRow(ROW2, true); 638 scan.withStopRow(ROW2, true); 639 scan.setScanMetricsEnabled(isScanMetricsEnabled); 640 ScanMetrics scanMetrics; 641 try (Table table = UTIL.getConnection().getTable(tableName); 642 ResultScanner scanner = table.getScanner(scan)) { 643 int rowCount = 0; 644 StoreFileScanner.instrument(); 645 for (Result r : scanner) { 646 rowCount++; 647 } 648 Assert.assertEquals(1, rowCount); 649 scanMetrics = scanner.getScanMetrics(); 650 } 651 if (isScanMetricsEnabled) { 652 LOG.info("Bytes read from fs: " + scanMetrics.bytesReadFromFs.get()); 653 LOG.info("Bytes read from block cache: " + scanMetrics.bytesReadFromBlockCache.get()); 654 LOG.info("Bytes read from memstore: " + scanMetrics.bytesReadFromMemstore.get()); 655 LOG.info("Count of bytes scanned: " + scanMetrics.countOfBlockBytesScanned.get()); 656 LOG.info("StoreFileScanners seek count: " + StoreFileScanner.getSeekCount()); 657 } 658 return scanMetrics; 659 } 660 661 private void writeData(TableName tableName, boolean shouldFlush) throws Exception { 662 try (Table table = UTIL.getConnection().getTable(tableName)) { 663 table.put(new Put(ROW2).addColumn(CF, CQ, VALUE)); 664 table.put(new Put(ROW4).addColumn(CF, CQ, VALUE)); 665 if (shouldFlush) { 666 // Create a HFile 667 UTIL.flush(tableName); 668 } 669 670 table.put(new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, VALUE)); 671 table.put(new Put(Bytes.toBytes("row5")).addColumn(CF, CQ, VALUE)); 672 if (shouldFlush) { 673 // Create a HFile 674 UTIL.flush(tableName); 675 } 676 } 677 } 678 679 private void createTable(TableName tableName, boolean blockCacheEnabled, BloomType bloomType) 680 throws Exception { 681 createTable(tableName, blockCacheEnabled, bloomType, HConstants.DEFAULT_BLOCKSIZE, 682 new HashMap<>()); 683 } 684 685 private void createTable(TableName tableName, boolean blockCacheEnabled, BloomType bloomType, 686 Map<String, String> configuration) throws Exception { 687 createTable(tableName, blockCacheEnabled, bloomType, HConstants.DEFAULT_BLOCKSIZE, 688 configuration); 689 } 690 691 private void createTable(TableName tableName, boolean blockCacheEnabled, BloomType bloomType, 692 int blocksize, Map<String, String> configuration) throws Exception { 693 Admin admin = UTIL.getAdmin(); 694 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName); 695 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = 696 ColumnFamilyDescriptorBuilder.newBuilder(CF); 697 columnFamilyDescriptorBuilder.setBloomFilterType(bloomType); 698 columnFamilyDescriptorBuilder.setBlockCacheEnabled(blockCacheEnabled); 699 columnFamilyDescriptorBuilder.setBlocksize(blocksize); 700 for (Map.Entry<String, String> entry : configuration.entrySet()) { 701 columnFamilyDescriptorBuilder.setConfiguration(entry.getKey(), entry.getValue()); 702 } 703 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build()); 704 admin.createTable(tableDescriptorBuilder.build()); 705 UTIL.waitUntilAllRegionsAssigned(tableName); 706 } 707 708 private void assertBytesReadFromFs(TableName tableName, long actualBytesReadFromFs, 709 KeyValue keyValue, long actualReadOps) throws Exception { 710 List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName); 711 Assert.assertEquals(1, regions.size()); 712 MutableInt totalExpectedBytesReadFromFs = new MutableInt(0); 713 MutableInt totalExpectedReadOps = new MutableInt(0); 714 for (HRegion region : regions) { 715 Assert.assertNull(region.getBlockCache()); 716 HStore store = region.getStore(CF); 717 Collection<HStoreFile> storeFiles = store.getStorefiles(); 718 Assert.assertEquals(2, storeFiles.size()); 719 for (HStoreFile storeFile : storeFiles) { 720 StoreFileReader reader = storeFile.getReader(); 721 HFile.Reader hfileReader = reader.getHFileReader(); 722 BloomFilter bloomFilter = reader.getGeneralBloomFilter(); 723 Assert.assertTrue(bloomFilter == null || bloomFilter instanceof CompoundBloomFilter); 724 CompoundBloomFilter cbf = bloomFilter == null ? null : (CompoundBloomFilter) bloomFilter; 725 Consumer<HFileBlock> bytesReadFunction = new Consumer<HFileBlock>() { 726 @Override 727 public void accept(HFileBlock block) { 728 totalExpectedBytesReadFromFs.add(block.getOnDiskSizeWithHeader()); 729 if (block.getNextBlockOnDiskSize() > 0) { 730 totalExpectedBytesReadFromFs.add(block.headerSize()); 731 } 732 totalExpectedReadOps.add(1); 733 } 734 }; 735 readHFile(hfileReader, cbf, keyValue, bytesReadFunction); 736 } 737 } 738 Assert.assertEquals(totalExpectedBytesReadFromFs.longValue(), actualBytesReadFromFs); 739 Assert.assertEquals(totalExpectedReadOps.longValue(), actualReadOps); 740 } 741 742 private void readHFile(HFile.Reader hfileReader, CompoundBloomFilter cbf, KeyValue keyValue, 743 Consumer<HFileBlock> bytesReadFunction) throws Exception { 744 HFileBlock.FSReader blockReader = hfileReader.getUncachedBlockReader(); 745 FixedFileTrailer trailer = hfileReader.getTrailer(); 746 HFileContext meta = hfileReader.getFileContext(); 747 long fileSize = hfileReader.length(); 748 749 // Read the bloom block from FS 750 if (cbf != null) { 751 // Read a block in load-on-open section to make sure prefetched header is not bloom 752 // block's header 753 blockReader.readBlockData(trailer.getLoadOnOpenDataOffset(), -1, true, true, true).release(); 754 755 HFileBlockIndex.BlockIndexReader index = cbf.getBloomIndex(); 756 byte[] row = ROW2; 757 int blockIndex = index.rootBlockContainingKey(row, 0, row.length); 758 HFileBlock bloomBlock = cbf.getBloomBlock(blockIndex); 759 boolean fileContainsKey = BloomFilterUtil.contains(row, 0, row.length, 760 bloomBlock.getBufferReadOnly(), bloomBlock.headerSize(), 761 bloomBlock.getUncompressedSizeWithoutHeader(), cbf.getHash(), cbf.getHashCount()); 762 bytesReadFunction.accept(bloomBlock); 763 // Asser that the block read is a bloom block 764 Assert.assertEquals(bloomBlock.getBlockType(), BlockType.BLOOM_CHUNK); 765 bloomBlock.release(); 766 if (!fileContainsKey) { 767 // Key is not in th file, so we don't need to read the data block 768 return; 769 } 770 } 771 772 // Indexes use NoOpEncodedSeeker 773 MyNoOpEncodedSeeker seeker = new MyNoOpEncodedSeeker(); 774 HFileBlock.BlockIterator blockIter = blockReader.blockRange(trailer.getLoadOnOpenDataOffset(), 775 fileSize - trailer.getTrailerSize()); 776 HFileBlock block = blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX); 777 778 // Comparator class name is stored in the trailer in version 3. 779 CellComparator comparator = trailer.createComparator(); 780 // Initialize the seeker 781 seeker.initRootIndex(block, trailer.getDataIndexCount(), comparator, 782 trailer.getNumDataIndexLevels()); 783 784 int blockLevelsRead = 1; // Root index is the first level 785 786 int rootLevIndex = seeker.rootBlockContainingKey(keyValue); 787 long currentOffset = seeker.getBlockOffset(rootLevIndex); 788 int currentDataSize = seeker.getBlockDataSize(rootLevIndex); 789 790 HFileBlock prevBlock = null; 791 do { 792 prevBlock = block; 793 block = blockReader.readBlockData(currentOffset, currentDataSize, true, true, true); 794 HFileBlock unpacked = block.unpack(meta, blockReader); 795 if (unpacked != block) { 796 block.release(); 797 block = unpacked; 798 } 799 bytesReadFunction.accept(block); 800 if (!block.getBlockType().isData()) { 801 ByteBuff buffer = block.getBufferWithoutHeader(); 802 // Place the buffer at the correct position 803 HFileBlockIndex.BlockIndexReader.locateNonRootIndexEntry(buffer, keyValue, comparator); 804 currentOffset = buffer.getLong(); 805 currentDataSize = buffer.getInt(); 806 } 807 prevBlock.release(); 808 blockLevelsRead++; 809 } while (!block.getBlockType().isData()); 810 block.release(); 811 blockIter.freeBlocks(); 812 813 Assert.assertEquals(blockLevelsRead, trailer.getNumDataIndexLevels() + 1); 814 } 815 816 private void assertBytesReadFromBlockCache(TableName tableName, 817 long actualBytesReadFromBlockCache, KeyValue keyValue) throws Exception { 818 List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName); 819 Assert.assertEquals(1, regions.size()); 820 MutableInt totalExpectedBytesReadFromBlockCache = new MutableInt(0); 821 for (HRegion region : regions) { 822 Assert.assertNotNull(region.getBlockCache()); 823 HStore store = region.getStore(CF); 824 Collection<HStoreFile> storeFiles = store.getStorefiles(); 825 Assert.assertEquals(2, storeFiles.size()); 826 for (HStoreFile storeFile : storeFiles) { 827 StoreFileReader reader = storeFile.getReader(); 828 HFile.Reader hfileReader = reader.getHFileReader(); 829 BloomFilter bloomFilter = reader.getGeneralBloomFilter(); 830 Assert.assertTrue(bloomFilter == null || bloomFilter instanceof CompoundBloomFilter); 831 CompoundBloomFilter cbf = bloomFilter == null ? null : (CompoundBloomFilter) bloomFilter; 832 Consumer<HFileBlock> bytesReadFunction = new Consumer<HFileBlock>() { 833 @Override 834 public void accept(HFileBlock block) { 835 totalExpectedBytesReadFromBlockCache.add(block.getOnDiskSizeWithHeader()); 836 if (block.getNextBlockOnDiskSize() > 0) { 837 totalExpectedBytesReadFromBlockCache.add(block.headerSize()); 838 } 839 } 840 }; 841 readHFile(hfileReader, cbf, keyValue, bytesReadFunction); 842 } 843 } 844 Assert.assertEquals(totalExpectedBytesReadFromBlockCache.longValue(), 845 actualBytesReadFromBlockCache); 846 } 847 848 private void assertBlockCacheWarmUp(TableName tableName, KeyValue keyValue) throws Exception { 849 List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName); 850 Assert.assertEquals(1, regions.size()); 851 for (HRegion region : regions) { 852 Assert.assertNotNull(region.getBlockCache()); 853 HStore store = region.getStore(CF); 854 Collection<HStoreFile> storeFiles = store.getStorefiles(); 855 Assert.assertEquals(2, storeFiles.size()); 856 for (HStoreFile storeFile : storeFiles) { 857 StoreFileReader reader = storeFile.getReader(); 858 HFile.Reader hfileReader = reader.getHFileReader(); 859 BloomFilter bloomFilter = reader.getGeneralBloomFilter(); 860 Assert.assertTrue(bloomFilter == null || bloomFilter instanceof CompoundBloomFilter); 861 CompoundBloomFilter cbf = bloomFilter == null ? null : (CompoundBloomFilter) bloomFilter; 862 Consumer<HFileBlock> bytesReadFunction = new Consumer<HFileBlock>() { 863 @Override 864 public void accept(HFileBlock block) { 865 assertBlockIsCached(hfileReader, block, region.getBlockCache()); 866 } 867 }; 868 readHFile(hfileReader, cbf, keyValue, bytesReadFunction); 869 } 870 } 871 } 872 873 private void assertBlockIsCached(HFile.Reader hfileReader, HFileBlock block, 874 BlockCache blockCache) { 875 if (blockCache == null) { 876 return; 877 } 878 Path path = hfileReader.getPath(); 879 BlockCacheKey key = new BlockCacheKey(path, block.getOffset(), true, block.getBlockType()); 880 HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(key, true, false, true); 881 Assert.assertNotNull(cachedBlock); 882 Assert.assertEquals(block.getOnDiskSizeWithHeader(), cachedBlock.getOnDiskSizeWithHeader()); 883 Assert.assertEquals(block.getNextBlockOnDiskSize(), cachedBlock.getNextBlockOnDiskSize()); 884 cachedBlock.release(); 885 } 886 887 private static class MyNoOpEncodedSeeker extends NoOpIndexBlockEncoder.NoOpEncodedSeeker { 888 public long getBlockOffset(int i) { 889 return blockOffsets[i]; 890 } 891 892 public int getBlockDataSize(int i) { 893 return blockDataSizes[i]; 894 } 895 } 896}