001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.instanceOf;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertFalse;
025import static org.junit.jupiter.api.Assertions.assertNotNull;
026import static org.junit.jupiter.api.Assertions.assertNull;
027import static org.junit.jupiter.api.Assertions.assertSame;
028import static org.junit.jupiter.api.Assertions.assertTrue;
029import static org.mockito.ArgumentMatchers.anyList;
030import static org.mockito.Mockito.spy;
031import static org.mockito.Mockito.times;
032import static org.mockito.Mockito.verify;
033
034import java.io.IOException;
035import java.util.Arrays;
036import java.util.HashSet;
037import java.util.Map;
038import java.util.Set;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.Cell;
043import org.apache.hadoop.hbase.HBaseTestingUtil;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
047import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
048import org.apache.hadoop.hbase.filter.FilterBase;
049import org.apache.hadoop.hbase.io.hfile.BlockCache;
050import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache;
051import org.apache.hadoop.hbase.regionserver.HStore;
052import org.apache.hadoop.hbase.regionserver.HStoreFile;
053import org.apache.hadoop.hbase.regionserver.RegionScanner;
054import org.apache.hadoop.hbase.regionserver.StoreScanner;
055import org.apache.hadoop.hbase.testclassification.ClientTests;
056import org.apache.hadoop.hbase.testclassification.SmallTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.junit.jupiter.api.AfterAll;
059import org.junit.jupiter.api.BeforeAll;
060import org.junit.jupiter.api.BeforeEach;
061import org.junit.jupiter.api.Tag;
062import org.junit.jupiter.api.Test;
063import org.junit.jupiter.api.TestInfo;
064
065@Tag(SmallTests.TAG)
066@Tag(ClientTests.TAG)
067public class TestClientSideRegionScanner {
068  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
069  private static final TableName TABLE_NAME = TableName.valueOf("test");
070  private static final byte[] FAM_NAME = Bytes.toBytes("f");
071
072  private Configuration conf;
073  private Path rootDir;
074  private FileSystem fs;
075  private TableDescriptor htd;
076  private RegionInfo hri;
077  private Scan scan;
078  private String methodName;
079
080  @BeforeAll
081  public static void setUpBeforeClass() throws Exception {
082    TEST_UTIL.startMiniCluster(1);
083  }
084
085  @AfterAll
086  public static void tearDownAfterClass() throws Exception {
087    TEST_UTIL.shutdownMiniCluster();
088  }
089
090  @BeforeEach
091  public void setup(TestInfo testInfo) throws IOException {
092    this.methodName = testInfo.getTestMethod().get().getName();
093    conf = TEST_UTIL.getConfiguration();
094    rootDir = TEST_UTIL.getDefaultRootDirPath();
095    fs = TEST_UTIL.getTestFileSystem();
096    htd = TEST_UTIL.getAdmin().getDescriptor(TableName.META_TABLE_NAME);
097    hri = TEST_UTIL.getAdmin().getRegions(TableName.META_TABLE_NAME).get(0);
098    scan = new Scan();
099  }
100
101  @Test
102  public void testDefaultBlockCache() throws IOException {
103    Configuration copyConf = new Configuration(conf);
104    try (ClientSideRegionScanner clientSideRegionScanner =
105      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) {
106      BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
107      assertNotNull(blockCache);
108      assertThat(blockCache, instanceOf(IndexOnlyLruBlockCache.class));
109      assertEquals(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT,
110        blockCache.getMaxSize());
111    }
112  }
113
114  @Test
115  public void testConfiguredBlockCache() throws IOException {
116    Configuration copyConf = new Configuration(conf);
117    // tiny 1MB fixed cache size
118    long blockCacheFixedSize = 1024 * 1024L;
119    copyConf.setLong(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY, blockCacheFixedSize);
120    try (ClientSideRegionScanner clientSideRegionScanner =
121      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) {
122      BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
123      assertNotNull(blockCache);
124      assertThat(blockCache, instanceOf(IndexOnlyLruBlockCache.class));
125      assertEquals(blockCacheFixedSize, blockCache.getMaxSize());
126    }
127  }
128
129  @Test
130  public void testNoBlockCache() throws IOException {
131    Configuration copyConf = new Configuration(conf);
132    copyConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
133    try (ClientSideRegionScanner clientSideRegionScanner =
134      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) {
135      BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
136      assertNull(blockCache);
137    }
138  }
139
140  @Test
141  public void testContinuesToScanIfHasMore() throws IOException {
142    // Conditions for this test to set up RegionScannerImpl to bail on the scan
143    // after a single iteration
144    // 1. Configure preadMaxBytes to something small to trigger scannerContext#returnImmediately
145    // 2. Configure a filter to filter out some rows, in this case rows with values < 5
146    // 3. Configure the filter's hasFilterRow to return true so RegionScannerImpl sets
147    // the limitScope to something with a depth of 0, so we bail on the scan after the first
148    // iteration
149
150    Configuration copyConf = new Configuration(conf);
151    copyConf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1);
152    Scan scan = new Scan();
153    scan.setFilter(new FiltersRowsLessThan5());
154    scan.setLimit(1);
155
156    try (Table table = TEST_UTIL.createTable(TABLE_NAME, FAM_NAME)) {
157      TableDescriptor htd = TEST_UTIL.getAdmin().getDescriptor(TABLE_NAME);
158      RegionInfo hri = TEST_UTIL.getAdmin().getRegions(TABLE_NAME).get(0);
159
160      for (int i = 0; i < 10; ++i) {
161        table.put(createPut(i));
162      }
163
164      // Flush contents to disk so we can scan the fs
165      TEST_UTIL.getAdmin().flush(TABLE_NAME);
166
167      try (ClientSideRegionScanner clientSideRegionScanner =
168        new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) {
169        RegionScanner scannerSpy = spy(clientSideRegionScanner.scanner);
170        clientSideRegionScanner.scanner = scannerSpy;
171        Result result = clientSideRegionScanner.next();
172
173        verify(scannerSpy, times(6)).nextRaw(anyList());
174        assertNotNull(result);
175        assertEquals(Bytes.toInt(result.getRow()), 5);
176        assertTrue(clientSideRegionScanner.hasMore);
177
178        for (int i = 6; i < 10; ++i) {
179          result = clientSideRegionScanner.next();
180          verify(scannerSpy, times(i + 1)).nextRaw(anyList());
181          assertNotNull(result);
182          assertEquals(Bytes.toInt(result.getRow()), i);
183        }
184
185        result = clientSideRegionScanner.next();
186        assertNull(result);
187        assertFalse(clientSideRegionScanner.hasMore);
188      }
189    }
190  }
191
192  @Test
193  public void testScanMetricsDisabled() throws IOException {
194    Configuration copyConf = new Configuration(conf);
195    Scan scan = new Scan();
196    try (ClientSideRegionScanner clientSideRegionScanner =
197      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) {
198      clientSideRegionScanner.next();
199      assertNull(clientSideRegionScanner.getScanMetrics());
200    }
201  }
202
203  private void testScanMetricsWithScanMetricsByRegionDisabled(ScanMetrics scanMetrics)
204    throws IOException {
205    Configuration copyConf = new Configuration(conf);
206    Scan scan = new Scan();
207    scan.setScanMetricsEnabled(true);
208    TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
209    try (ClientSideRegionScanner clientSideRegionScanner =
210      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, scanMetrics)) {
211      clientSideRegionScanner.next();
212      ScanMetrics scanMetricsFromScanner = clientSideRegionScanner.getScanMetrics();
213      assertNotNull(scanMetricsFromScanner);
214      if (scanMetrics != null) {
215        assertSame(scanMetrics, scanMetricsFromScanner);
216      }
217      Map<String, Long> metricsMap = scanMetricsFromScanner.getMetricsMap(false);
218      assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0);
219      assertTrue(scanMetricsFromScanner.collectMetricsByRegion(false).isEmpty());
220    }
221  }
222
223  @Test
224  public void testScanMetricsNotAsInputWithScanMetricsByRegionDisabled() throws IOException {
225    testScanMetricsWithScanMetricsByRegionDisabled(null);
226  }
227
228  @Test
229  public void testScanMetricsAsInputWithScanMetricsByRegionDisabled() throws IOException {
230    testScanMetricsWithScanMetricsByRegionDisabled(new ScanMetrics());
231  }
232
233  private void testScanMetricByRegion(ScanMetrics scanMetrics) throws IOException {
234    Configuration copyConf = new Configuration(conf);
235    Scan scan = new Scan();
236    scan.setEnableScanMetricsByRegion(true);
237    TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
238    try (ClientSideRegionScanner clientSideRegionScanner =
239      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, scanMetrics)) {
240      clientSideRegionScanner.next();
241      ScanMetrics scanMetricsFromScanner = clientSideRegionScanner.getScanMetrics();
242      assertNotNull(scanMetricsFromScanner);
243      if (scanMetrics != null) {
244        assertSame(scanMetrics, scanMetricsFromScanner);
245      }
246      Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
247        scanMetricsFromScanner.collectMetricsByRegion();
248      assertEquals(1, scanMetricsByRegion.size());
249      for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
250        .entrySet()) {
251        ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
252        Map<String, Long> metricsMap = entry.getValue();
253        assertEquals(hri.getEncodedName(), scanMetricsRegionInfo.getEncodedRegionName());
254        assertNull(scanMetricsRegionInfo.getServerName());
255        assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0);
256        assertEquals((long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME),
257          scanMetricsFromScanner.countOfRowsScanned.get());
258      }
259    }
260  }
261
262  @Test
263  public void testScanMetricsByRegionWithoutScanMetricsAsInput() throws IOException {
264    testScanMetricByRegion(null);
265  }
266
267  @Test
268  public void testScanMetricsByRegionWithScanMetricsAsInput() throws IOException {
269    testScanMetricByRegion(new ScanMetrics());
270  }
271
272  @Test
273  public void testGetFilesRead() throws Exception {
274    // Create a table and add some data
275    TableName tableName = TableName.valueOf(methodName);
276    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAME })) {
277      TableDescriptor tableHtd = TEST_UTIL.getAdmin().getDescriptor(tableName);
278      RegionInfo tableHri = TEST_UTIL.getAdmin().getRegions(tableName).get(0);
279
280      // Add some data
281      for (int i = 0; i < 5; i++) {
282        byte[] row = Bytes.toBytes(i);
283        Put put = new Put(row);
284        put.addColumn(FAM_NAME, row, row);
285        table.put(put);
286      }
287
288      // Flush contents to disk so we can scan the fs
289      TEST_UTIL.getAdmin().flush(tableName);
290
291      // Create ClientSideRegionScanner with the correct table descriptor and region info
292      Configuration copyConf = new Configuration(conf);
293      Scan tableScan = new Scan();
294      ClientSideRegionScanner clientSideRegionScanner =
295        new ClientSideRegionScanner(copyConf, fs, rootDir, tableHtd, tableHri, tableScan, null);
296
297      // Get expected file paths from the region before closing
298      // (after closing, the region will be closed too)
299      Set<Path> expectedFilePaths = new HashSet<>();
300      HStore store = clientSideRegionScanner.getRegion().getStore(FAM_NAME);
301      for (HStoreFile storeFile : store.getStorefiles()) {
302        Path qualifiedPath = fs.makeQualified(storeFile.getPath());
303        expectedFilePaths.add(qualifiedPath);
304      }
305      int expectedFileCount = expectedFilePaths.size();
306      assertTrue(expectedFileCount >= 1, "Should have at least one store file after flush");
307
308      // Before closing, should return empty set
309      Set<Path> filesReadBeforeClose = clientSideRegionScanner.getFilesRead();
310      assertTrue(filesReadBeforeClose.isEmpty(), "Should return empty set before closing");
311
312      // Scan through some results
313      Result result;
314      int count = 0;
315      while ((result = clientSideRegionScanner.next()) != null && count < 3) {
316        assertNotNull(result, "Result should not be null");
317        count++;
318      }
319
320      // Still should return empty set before closing
321      filesReadBeforeClose = clientSideRegionScanner.getFilesRead();
322      assertTrue(filesReadBeforeClose.isEmpty(),
323        "Should return empty set before closing even after scanning");
324
325      // Close the scanner - this should collect files from the underlying scanner
326      clientSideRegionScanner.close();
327
328      // After closing, should return files from the underlying scanner
329      Set<Path> filesReadAfterClose = clientSideRegionScanner.getFilesRead();
330      // Verify exact file count
331      assertEquals(expectedFileCount, filesReadAfterClose.size(),
332        "Should have exact file count after closing");
333      // Verify exact file names match
334      assertEquals(expectedFilePaths, filesReadAfterClose,
335        "Should contain all expected file paths");
336    } finally {
337      TEST_UTIL.deleteTable(tableName);
338    }
339  }
340
341  private static Put createPut(int rowAsInt) {
342    byte[] row = Bytes.toBytes(rowAsInt);
343    Put put = new Put(row);
344    put.addColumn(FAM_NAME, row, row);
345    return put;
346  }
347
348  private static class FiltersRowsLessThan5 extends FilterBase {
349
350    @Override
351    public boolean filterRowKey(Cell cell) {
352      byte[] rowKey = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(),
353        cell.getRowLength() + cell.getRowOffset());
354      int intValue = Bytes.toInt(rowKey);
355      return intValue < 5;
356    }
357
358    @Override
359    public boolean hasFilterRow() {
360      return true;
361    }
362  }
363}