001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertNotNull; 024import static org.junit.Assert.assertNull; 025import static org.junit.Assert.assertTrue; 026import static org.mockito.ArgumentMatchers.anyList; 027import static org.mockito.Mockito.spy; 028import static org.mockito.Mockito.times; 029import static org.mockito.Mockito.verify; 030 031import java.io.IOException; 032import java.util.Arrays; 033import java.util.HashSet; 034import java.util.Map; 035import java.util.Set; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestingUtil; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 045import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo; 046import org.apache.hadoop.hbase.filter.FilterBase; 047import org.apache.hadoop.hbase.io.hfile.BlockCache; 048import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache; 049import org.apache.hadoop.hbase.regionserver.HStore; 050import org.apache.hadoop.hbase.regionserver.HStoreFile; 051import org.apache.hadoop.hbase.regionserver.RegionScanner; 052import org.apache.hadoop.hbase.regionserver.StoreScanner; 053import org.apache.hadoop.hbase.testclassification.ClientTests; 054import org.apache.hadoop.hbase.testclassification.SmallTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.junit.AfterClass; 057import org.junit.Assert; 058import org.junit.Before; 059import org.junit.BeforeClass; 060import org.junit.ClassRule; 061import org.junit.Rule; 062import org.junit.Test; 063import org.junit.experimental.categories.Category; 064import org.junit.rules.TestName; 065 066@Category({ SmallTests.class, ClientTests.class }) 067public class TestClientSideRegionScanner { 068 @ClassRule 069 public static final HBaseClassTestRule CLASS_RULE = 070 HBaseClassTestRule.forClass(TestClientSideRegionScanner.class); 071 072 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 073 private static final TableName TABLE_NAME = TableName.valueOf("test"); 074 private static final byte[] FAM_NAME = Bytes.toBytes("f"); 075 076 private Configuration conf; 077 private Path rootDir; 078 private FileSystem fs; 079 private TableDescriptor htd; 080 private RegionInfo hri; 081 private Scan scan; 082 083 @Rule 084 public TestName name = new TestName(); 085 086 @BeforeClass 087 public static void setUpBeforeClass() throws Exception { 088 TEST_UTIL.startMiniCluster(1); 089 } 090 091 @AfterClass 092 public static void tearDownAfterClass() throws Exception { 093 TEST_UTIL.shutdownMiniCluster(); 094 } 095 096 @Before 097 public void setup() throws IOException { 098 conf = TEST_UTIL.getConfiguration(); 099 rootDir = TEST_UTIL.getDefaultRootDirPath(); 100 fs = TEST_UTIL.getTestFileSystem(); 101 htd = TEST_UTIL.getAdmin().getDescriptor(TableName.META_TABLE_NAME); 102 hri = TEST_UTIL.getAdmin().getRegions(TableName.META_TABLE_NAME).get(0); 103 scan = new Scan(); 104 } 105 106 @Test 107 public void testDefaultBlockCache() throws IOException { 108 Configuration copyConf = new Configuration(conf); 109 ClientSideRegionScanner clientSideRegionScanner = 110 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null); 111 112 BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache(); 113 assertNotNull(blockCache); 114 assertTrue(blockCache instanceof IndexOnlyLruBlockCache); 115 assertTrue(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT 116 == blockCache.getMaxSize()); 117 } 118 119 @Test 120 public void testConfiguredBlockCache() throws IOException { 121 Configuration copyConf = new Configuration(conf); 122 // tiny 1MB fixed cache size 123 long blockCacheFixedSize = 1024 * 1024L; 124 copyConf.setLong(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY, blockCacheFixedSize); 125 ClientSideRegionScanner clientSideRegionScanner = 126 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null); 127 128 BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache(); 129 assertNotNull(blockCache); 130 assertTrue(blockCache instanceof IndexOnlyLruBlockCache); 131 assertTrue(blockCacheFixedSize == blockCache.getMaxSize()); 132 } 133 134 @Test 135 public void testNoBlockCache() throws IOException { 136 Configuration copyConf = new Configuration(conf); 137 copyConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); 138 ClientSideRegionScanner clientSideRegionScanner = 139 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null); 140 141 BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache(); 142 assertNull(blockCache); 143 } 144 145 @Test 146 public void testContinuesToScanIfHasMore() throws IOException { 147 // Conditions for this test to set up RegionScannerImpl to bail on the scan 148 // after a single iteration 149 // 1. Configure preadMaxBytes to something small to trigger scannerContext#returnImmediately 150 // 2. Configure a filter to filter out some rows, in this case rows with values < 5 151 // 3. Configure the filter's hasFilterRow to return true so RegionScannerImpl sets 152 // the limitScope to something with a depth of 0, so we bail on the scan after the first 153 // iteration 154 155 Configuration copyConf = new Configuration(conf); 156 copyConf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1); 157 Scan scan = new Scan(); 158 scan.setFilter(new FiltersRowsLessThan5()); 159 scan.setLimit(1); 160 161 try (Table table = TEST_UTIL.createTable(TABLE_NAME, FAM_NAME)) { 162 TableDescriptor htd = TEST_UTIL.getAdmin().getDescriptor(TABLE_NAME); 163 RegionInfo hri = TEST_UTIL.getAdmin().getRegions(TABLE_NAME).get(0); 164 165 for (int i = 0; i < 10; ++i) { 166 table.put(createPut(i)); 167 } 168 169 // Flush contents to disk so we can scan the fs 170 TEST_UTIL.getAdmin().flush(TABLE_NAME); 171 172 ClientSideRegionScanner clientSideRegionScanner = 173 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null); 174 RegionScanner scannerSpy = spy(clientSideRegionScanner.scanner); 175 clientSideRegionScanner.scanner = scannerSpy; 176 Result result = clientSideRegionScanner.next(); 177 178 verify(scannerSpy, times(6)).nextRaw(anyList()); 179 assertNotNull(result); 180 assertEquals(Bytes.toInt(result.getRow()), 5); 181 assertTrue(clientSideRegionScanner.hasMore); 182 183 for (int i = 6; i < 10; ++i) { 184 result = clientSideRegionScanner.next(); 185 verify(scannerSpy, times(i + 1)).nextRaw(anyList()); 186 assertNotNull(result); 187 assertEquals(Bytes.toInt(result.getRow()), i); 188 } 189 190 result = clientSideRegionScanner.next(); 191 assertNull(result); 192 assertFalse(clientSideRegionScanner.hasMore); 193 } 194 } 195 196 @Test 197 public void testScanMetricsDisabled() throws IOException { 198 Configuration copyConf = new Configuration(conf); 199 Scan scan = new Scan(); 200 try (ClientSideRegionScanner clientSideRegionScanner = 201 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) { 202 clientSideRegionScanner.next(); 203 assertNull(clientSideRegionScanner.getScanMetrics()); 204 } 205 } 206 207 private void testScanMetricsWithScanMetricsByRegionDisabled(ScanMetrics scanMetrics) 208 throws IOException { 209 Configuration copyConf = new Configuration(conf); 210 Scan scan = new Scan(); 211 scan.setScanMetricsEnabled(true); 212 TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME); 213 try (ClientSideRegionScanner clientSideRegionScanner = 214 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, scanMetrics)) { 215 clientSideRegionScanner.next(); 216 ScanMetrics scanMetricsFromScanner = clientSideRegionScanner.getScanMetrics(); 217 assertNotNull(scanMetricsFromScanner); 218 if (scanMetrics != null) { 219 Assert.assertSame(scanMetrics, scanMetricsFromScanner); 220 } 221 Map<String, Long> metricsMap = scanMetricsFromScanner.getMetricsMap(false); 222 Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0); 223 Assert.assertTrue(scanMetricsFromScanner.collectMetricsByRegion(false).isEmpty()); 224 } 225 } 226 227 @Test 228 public void testScanMetricsNotAsInputWithScanMetricsByRegionDisabled() throws IOException { 229 testScanMetricsWithScanMetricsByRegionDisabled(null); 230 } 231 232 @Test 233 public void testScanMetricsAsInputWithScanMetricsByRegionDisabled() throws IOException { 234 testScanMetricsWithScanMetricsByRegionDisabled(new ScanMetrics()); 235 } 236 237 private void testScanMetricByRegion(ScanMetrics scanMetrics) throws IOException { 238 Configuration copyConf = new Configuration(conf); 239 Scan scan = new Scan(); 240 scan.setEnableScanMetricsByRegion(true); 241 TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME); 242 try (ClientSideRegionScanner clientSideRegionScanner = 243 new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, scanMetrics)) { 244 clientSideRegionScanner.next(); 245 ScanMetrics scanMetricsFromScanner = clientSideRegionScanner.getScanMetrics(); 246 assertNotNull(scanMetricsFromScanner); 247 if (scanMetrics != null) { 248 Assert.assertSame(scanMetrics, scanMetricsFromScanner); 249 } 250 Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion = 251 scanMetricsFromScanner.collectMetricsByRegion(); 252 Assert.assertEquals(1, scanMetricsByRegion.size()); 253 for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion 254 .entrySet()) { 255 ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey(); 256 Map<String, Long> metricsMap = entry.getValue(); 257 Assert.assertEquals(hri.getEncodedName(), scanMetricsRegionInfo.getEncodedRegionName()); 258 Assert.assertNull(scanMetricsRegionInfo.getServerName()); 259 Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0); 260 Assert.assertEquals((long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME), 261 scanMetricsFromScanner.countOfRowsScanned.get()); 262 } 263 } 264 } 265 266 @Test 267 public void testScanMetricsByRegionWithoutScanMetricsAsInput() throws IOException { 268 testScanMetricByRegion(null); 269 } 270 271 @Test 272 public void testScanMetricsByRegionWithScanMetricsAsInput() throws IOException { 273 testScanMetricByRegion(new ScanMetrics()); 274 } 275 276 @Test 277 public void testGetFilesRead() throws Exception { 278 // Create a table and add some data 279 TableName tableName = TableName.valueOf(name.getMethodName()); 280 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAME })) { 281 TableDescriptor tableHtd = TEST_UTIL.getAdmin().getDescriptor(tableName); 282 RegionInfo tableHri = TEST_UTIL.getAdmin().getRegions(tableName).get(0); 283 284 // Add some data 285 for (int i = 0; i < 5; i++) { 286 byte[] row = Bytes.toBytes(i); 287 Put put = new Put(row); 288 put.addColumn(FAM_NAME, row, row); 289 table.put(put); 290 } 291 292 // Flush contents to disk so we can scan the fs 293 TEST_UTIL.getAdmin().flush(tableName); 294 295 // Create ClientSideRegionScanner with the correct table descriptor and region info 296 Configuration copyConf = new Configuration(conf); 297 Scan tableScan = new Scan(); 298 ClientSideRegionScanner clientSideRegionScanner = 299 new ClientSideRegionScanner(copyConf, fs, rootDir, tableHtd, tableHri, tableScan, null); 300 301 // Get expected file paths from the region before closing 302 // (after closing, the region will be closed too) 303 Set<Path> expectedFilePaths = new HashSet<>(); 304 HStore store = clientSideRegionScanner.getRegion().getStore(FAM_NAME); 305 for (HStoreFile storeFile : store.getStorefiles()) { 306 Path qualifiedPath = fs.makeQualified(storeFile.getPath()); 307 expectedFilePaths.add(qualifiedPath); 308 } 309 int expectedFileCount = expectedFilePaths.size(); 310 assertTrue("Should have at least one store file after flush", expectedFileCount >= 1); 311 312 // Before closing, should return empty set 313 Set<Path> filesReadBeforeClose = clientSideRegionScanner.getFilesRead(); 314 assertTrue("Should return empty set before closing", filesReadBeforeClose.isEmpty()); 315 316 // Scan through some results 317 Result result; 318 int count = 0; 319 while ((result = clientSideRegionScanner.next()) != null && count < 3) { 320 assertNotNull("Result should not be null", result); 321 count++; 322 } 323 324 // Still should return empty set before closing 325 filesReadBeforeClose = clientSideRegionScanner.getFilesRead(); 326 assertTrue("Should return empty set before closing even after scanning", 327 filesReadBeforeClose.isEmpty()); 328 329 // Close the scanner - this should collect files from the underlying scanner 330 clientSideRegionScanner.close(); 331 332 // After closing, should return files from the underlying scanner 333 Set<Path> filesReadAfterClose = clientSideRegionScanner.getFilesRead(); 334 // Verify exact file count 335 assertEquals("Should have exact file count after closing", expectedFileCount, 336 filesReadAfterClose.size()); 337 // Verify exact file names match 338 assertEquals("Should contain all expected file paths", expectedFilePaths, 339 filesReadAfterClose); 340 } finally { 341 TEST_UTIL.deleteTable(tableName); 342 } 343 } 344 345 private static Put createPut(int rowAsInt) { 346 byte[] row = Bytes.toBytes(rowAsInt); 347 Put put = new Put(row); 348 put.addColumn(FAM_NAME, row, row); 349 return put; 350 } 351 352 private static class FiltersRowsLessThan5 extends FilterBase { 353 354 @Override 355 public boolean filterRowKey(Cell cell) { 356 byte[] rowKey = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), 357 cell.getRowLength() + cell.getRowOffset()); 358 int intValue = Bytes.toInt(rowKey); 359 return intValue < 5; 360 } 361 362 @Override 363 public boolean hasFilterRow() { 364 return true; 365 } 366 } 367}