001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.instanceOf; 023import static org.junit.jupiter.api.Assertions.assertEquals; 024import static org.junit.jupiter.api.Assertions.assertFalse; 025import static org.junit.jupiter.api.Assertions.assertNotNull; 026import static org.junit.jupiter.api.Assertions.assertNull; 027import static org.junit.jupiter.api.Assertions.assertSame; 028import static org.junit.jupiter.api.Assertions.assertTrue; 029import static org.mockito.ArgumentMatchers.anyList; 030import static org.mockito.Mockito.spy; 031import static org.mockito.Mockito.times; 032import static org.mockito.Mockito.verify; 033 034import java.io.IOException; 035import java.util.Arrays; 036import java.util.HashSet; 037import java.util.Map; 038import java.util.Set; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FileSystem; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.hbase.Cell; 043import org.apache.hadoop.hbase.HBaseTestingUtil; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 047import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; 048import org.apache.hadoop.hbase.filter.FilterBase; 049import org.apache.hadoop.hbase.io.hfile.BlockCache; 050import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache; 051import org.apache.hadoop.hbase.regionserver.HStore; 052import org.apache.hadoop.hbase.regionserver.HStoreFile; 053import org.apache.hadoop.hbase.regionserver.RegionScanner; 054import org.apache.hadoop.hbase.regionserver.StoreScanner; 055import org.apache.hadoop.hbase.testclassification.ClientTests; 056import org.apache.hadoop.hbase.testclassification.SmallTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.junit.jupiter.api.AfterAll; 059import org.junit.jupiter.api.BeforeAll; 060import org.junit.jupiter.api.BeforeEach; 061import org.junit.jupiter.api.Tag; 062import org.junit.jupiter.api.Test; 063import org.junit.jupiter.api.TestInfo; 064 065@Tag(SmallTests.TAG) 066@Tag(ClientTests.TAG) 067public class TestClientSideRegionScanner { 068 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 069 private static final TableName TABLE_NAME = TableName.valueOf("test"); 070 private static final byte[] FAM_NAME = Bytes.toBytes("f"); 071 072 private Configuration conf; 073 private Path rootDir; 074 private FileSystem fs; 075 private TableDescriptor htd; 076 private RegionInfo hri; 077 private Scan scan; 078 private String methodName; 079 080 @BeforeAll 081 public static void setUpBeforeClass() throws Exception { 082 TEST_UTIL.startMiniCluster(1); 083 } 084 085 @AfterAll 086 public static void tearDownAfterClass() throws Exception { 087 TEST_UTIL.shutdownMiniCluster(); 088 } 089 090 @BeforeEach 091 public void setup(TestInfo testInfo) throws IOException { 092 this.methodName = testInfo.getTestMethod().get().getName(); 093 conf = TEST_UTIL.getConfiguration(); 094 rootDir = TEST_UTIL.getDefaultRootDirPath(); 095 fs = TEST_UTIL.getTestFileSystem(); 096 htd = TEST_UTIL.getAdmin().getDescriptor(TableName.META_TABLE_NAME); 097 hri = TEST_UTIL.getAdmin().getRegions(TableName.META_TABLE_NAME).get(0); 098 scan = new Scan(); 099 } 100 101 @Test 102 public void testDefaultBlockCache() throws IOException { 103 Configuration copyConf = new Configuration(conf); 104 try (ClientSideRegionScanner clientSideRegionScanner = 105 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) { 106 BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache(); 107 assertNotNull(blockCache); 108 assertThat(blockCache, instanceOf(IndexOnlyLruBlockCache.class)); 109 assertEquals(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT, 110 blockCache.getMaxSize()); 111 } 112 } 113 114 @Test 115 public void testConfiguredBlockCache() throws IOException { 116 Configuration copyConf = new Configuration(conf); 117 // tiny 1MB fixed cache size 118 long blockCacheFixedSize = 1024 * 1024L; 119 copyConf.setLong(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY, blockCacheFixedSize); 120 try (ClientSideRegionScanner clientSideRegionScanner = 121 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) { 122 BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache(); 123 assertNotNull(blockCache); 124 assertThat(blockCache, instanceOf(IndexOnlyLruBlockCache.class)); 125 assertEquals(blockCacheFixedSize, blockCache.getMaxSize()); 126 } 127 } 128 129 @Test 130 public void testNoBlockCache() throws IOException { 131 Configuration copyConf = new Configuration(conf); 132 copyConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); 133 try (ClientSideRegionScanner clientSideRegionScanner = 134 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) { 135 BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache(); 136 assertNull(blockCache); 137 } 138 } 139 140 @Test 141 public void testContinuesToScanIfHasMore() throws IOException { 142 // Conditions for this test to set up RegionScannerImpl to bail on the scan 143 // after a single iteration 144 // 1. Configure preadMaxBytes to something small to trigger scannerContext#returnImmediately 145 // 2. Configure a filter to filter out some rows, in this case rows with values < 5 146 // 3. Configure the filter's hasFilterRow to return true so RegionScannerImpl sets 147 // the limitScope to something with a depth of 0, so we bail on the scan after the first 148 // iteration 149 150 Configuration copyConf = new Configuration(conf); 151 copyConf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1); 152 Scan scan = new Scan(); 153 scan.setFilter(new FiltersRowsLessThan5()); 154 scan.setLimit(1); 155 156 try (Table table = TEST_UTIL.createTable(TABLE_NAME, FAM_NAME)) { 157 TableDescriptor htd = TEST_UTIL.getAdmin().getDescriptor(TABLE_NAME); 158 RegionInfo hri = TEST_UTIL.getAdmin().getRegions(TABLE_NAME).get(0); 159 160 for (int i = 0; i < 10; ++i) { 161 table.put(createPut(i)); 162 } 163 164 // Flush contents to disk so we can scan the fs 165 TEST_UTIL.getAdmin().flush(TABLE_NAME); 166 167 try (ClientSideRegionScanner clientSideRegionScanner = 168 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) { 169 RegionScanner scannerSpy = spy(clientSideRegionScanner.scanner); 170 clientSideRegionScanner.scanner = scannerSpy; 171 Result result = clientSideRegionScanner.next(); 172 173 verify(scannerSpy, times(6)).nextRaw(anyList()); 174 assertNotNull(result); 175 assertEquals(Bytes.toInt(result.getRow()), 5); 176 assertTrue(clientSideRegionScanner.hasMore); 177 178 for (int i = 6; i < 10; ++i) { 179 result = clientSideRegionScanner.next(); 180 verify(scannerSpy, times(i + 1)).nextRaw(anyList()); 181 assertNotNull(result); 182 assertEquals(Bytes.toInt(result.getRow()), i); 183 } 184 185 result = clientSideRegionScanner.next(); 186 assertNull(result); 187 assertFalse(clientSideRegionScanner.hasMore); 188 } 189 } 190 } 191 192 @Test 193 public void testScanMetricsDisabled() throws IOException { 194 Configuration copyConf = new Configuration(conf); 195 Scan scan = new Scan(); 196 try (ClientSideRegionScanner clientSideRegionScanner = 197 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) { 198 clientSideRegionScanner.next(); 199 assertNull(clientSideRegionScanner.getScanMetrics()); 200 } 201 } 202 203 private void testScanMetricsWithScanMetricsByRegionDisabled(ScanMetrics scanMetrics) 204 throws IOException { 205 Configuration copyConf = new Configuration(conf); 206 Scan scan = new Scan(); 207 scan.setScanMetricsEnabled(true); 208 TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME); 209 try (ClientSideRegionScanner clientSideRegionScanner = 210 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, scanMetrics)) { 211 clientSideRegionScanner.next(); 212 ScanMetrics scanMetricsFromScanner = clientSideRegionScanner.getScanMetrics(); 213 assertNotNull(scanMetricsFromScanner); 214 if (scanMetrics != null) { 215 assertSame(scanMetrics, scanMetricsFromScanner); 216 } 217 Map<String, Long> metricsMap = scanMetricsFromScanner.getMetricsMap(false); 218 assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0); 219 assertTrue(scanMetricsFromScanner.collectMetricsByRegion(false).isEmpty()); 220 } 221 } 222 223 @Test 224 public void testScanMetricsNotAsInputWithScanMetricsByRegionDisabled() throws IOException { 225 testScanMetricsWithScanMetricsByRegionDisabled(null); 226 } 227 228 @Test 229 public void testScanMetricsAsInputWithScanMetricsByRegionDisabled() throws IOException { 230 testScanMetricsWithScanMetricsByRegionDisabled(new ScanMetrics()); 231 } 232 233 private void testScanMetricByRegion(ScanMetrics scanMetrics) throws IOException { 234 Configuration copyConf = new Configuration(conf); 235 Scan scan = new Scan(); 236 scan.setEnableScanMetricsByRegion(true); 237 TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME); 238 try (ClientSideRegionScanner clientSideRegionScanner = 239 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, scanMetrics)) { 240 clientSideRegionScanner.next(); 241 ScanMetrics scanMetricsFromScanner = clientSideRegionScanner.getScanMetrics(); 242 assertNotNull(scanMetricsFromScanner); 243 if (scanMetrics != null) { 244 assertSame(scanMetrics, scanMetricsFromScanner); 245 } 246 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 247 scanMetricsFromScanner.collectMetricsByRegion(); 248 assertEquals(1, scanMetricsByRegion.size()); 249 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 250 .entrySet()) { 251 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 252 Map<String, Long> metricsMap = entry.getValue(); 253 assertEquals(hri.getEncodedName(), scanMetricsRegionInfo.getEncodedRegionName()); 254 assertNull(scanMetricsRegionInfo.getServerName()); 255 assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0); 256 assertEquals((long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME), 257 scanMetricsFromScanner.countOfRowsScanned.get()); 258 } 259 } 260 } 261 262 @Test 263 public void testScanMetricsByRegionWithoutScanMetricsAsInput() throws IOException { 264 testScanMetricByRegion(null); 265 } 266 267 @Test 268 public void testScanMetricsByRegionWithScanMetricsAsInput() throws IOException { 269 testScanMetricByRegion(new ScanMetrics()); 270 } 271 272 @Test 273 public void testGetFilesRead() throws Exception { 274 // Create a table and add some data 275 TableName tableName = TableName.valueOf(methodName); 276 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAME })) { 277 TableDescriptor tableHtd = TEST_UTIL.getAdmin().getDescriptor(tableName); 278 RegionInfo tableHri = TEST_UTIL.getAdmin().getRegions(tableName).get(0); 279 280 // Add some data 281 for (int i = 0; i < 5; i++) { 282 byte[] row = Bytes.toBytes(i); 283 Put put = new Put(row); 284 put.addColumn(FAM_NAME, row, row); 285 table.put(put); 286 } 287 288 // Flush contents to disk so we can scan the fs 289 TEST_UTIL.getAdmin().flush(tableName); 290 291 // Create ClientSideRegionScanner with the correct table descriptor and region info 292 Configuration copyConf = new Configuration(conf); 293 Scan tableScan = new Scan(); 294 ClientSideRegionScanner clientSideRegionScanner = 295 new ClientSideRegionScanner(copyConf, fs, rootDir, tableHtd, tableHri, tableScan, null); 296 297 // Get expected file paths from the region before closing 298 // (after closing, the region will be closed too) 299 Set<Path> expectedFilePaths = new HashSet<>(); 300 HStore store = clientSideRegionScanner.getRegion().getStore(FAM_NAME); 301 for (HStoreFile storeFile : store.getStorefiles()) { 302 Path qualifiedPath = fs.makeQualified(storeFile.getPath()); 303 expectedFilePaths.add(qualifiedPath); 304 } 305 int expectedFileCount = expectedFilePaths.size(); 306 assertTrue(expectedFileCount >= 1, "Should have at least one store file after flush"); 307 308 // Before closing, should return empty set 309 Set<Path> filesReadBeforeClose = clientSideRegionScanner.getFilesRead(); 310 assertTrue(filesReadBeforeClose.isEmpty(), "Should return empty set before closing"); 311 312 // Scan through some results 313 Result result; 314 int count = 0; 315 while ((result = clientSideRegionScanner.next()) != null && count < 3) { 316 assertNotNull(result, "Result should not be null"); 317 count++; 318 } 319 320 // Still should return empty set before closing 321 filesReadBeforeClose = clientSideRegionScanner.getFilesRead(); 322 assertTrue(filesReadBeforeClose.isEmpty(), 323 "Should return empty set before closing even after scanning"); 324 325 // Close the scanner - this should collect files from the underlying scanner 326 clientSideRegionScanner.close(); 327 328 // After closing, should return files from the underlying scanner 329 Set<Path> filesReadAfterClose = clientSideRegionScanner.getFilesRead(); 330 // Verify exact file count 331 assertEquals(expectedFileCount, filesReadAfterClose.size(), 332 "Should have exact file count after closing"); 333 // Verify exact file names match 334 assertEquals(expectedFilePaths, filesReadAfterClose, 335 "Should contain all expected file paths"); 336 } finally { 337 TEST_UTIL.deleteTable(tableName); 338 } 339 } 340 341 private static Put createPut(int rowAsInt) { 342 byte[] row = Bytes.toBytes(rowAsInt); 343 Put put = new Put(row); 344 put.addColumn(FAM_NAME, row, row); 345 return put; 346 } 347 348 private static class FiltersRowsLessThan5 extends FilterBase { 349 350 @Override 351 public boolean filterRowKey(Cell cell) { 352 byte[] rowKey = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), 353 cell.getRowLength() + cell.getRowOffset()); 354 int intValue = Bytes.toInt(rowKey); 355 return intValue < 5; 356 } 357 358 @Override 359 public boolean hasFilterRow() { 360 return true; 361 } 362 } 363}