001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.ArgumentMatchers.anyList;
027import static org.mockito.Mockito.spy;
028import static org.mockito.Mockito.times;
029import static org.mockito.Mockito.verify;
030
031import java.io.IOException;
032import java.util.Arrays;
033import java.util.HashSet;
034import java.util.Map;
035import java.util.Set;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtil;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
045import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
046import org.apache.hadoop.hbase.filter.FilterBase;
047import org.apache.hadoop.hbase.io.hfile.BlockCache;
048import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache;
049import org.apache.hadoop.hbase.regionserver.HStore;
050import org.apache.hadoop.hbase.regionserver.HStoreFile;
051import org.apache.hadoop.hbase.regionserver.RegionScanner;
052import org.apache.hadoop.hbase.regionserver.StoreScanner;
053import org.apache.hadoop.hbase.testclassification.ClientTests;
054import org.apache.hadoop.hbase.testclassification.SmallTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.junit.AfterClass;
057import org.junit.Assert;
058import org.junit.Before;
059import org.junit.BeforeClass;
060import org.junit.ClassRule;
061import org.junit.Rule;
062import org.junit.Test;
063import org.junit.experimental.categories.Category;
064import org.junit.rules.TestName;
065
066@Category({ SmallTests.class, ClientTests.class })
067public class TestClientSideRegionScanner {
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070    HBaseClassTestRule.forClass(TestClientSideRegionScanner.class);
071
072  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
073  private static final TableName TABLE_NAME = TableName.valueOf("test");
074  private static final byte[] FAM_NAME = Bytes.toBytes("f");
075
076  private Configuration conf;
077  private Path rootDir;
078  private FileSystem fs;
079  private TableDescriptor htd;
080  private RegionInfo hri;
081  private Scan scan;
082
083  @Rule
084  public TestName name = new TestName();
085
086  @BeforeClass
087  public static void setUpBeforeClass() throws Exception {
088    TEST_UTIL.startMiniCluster(1);
089  }
090
091  @AfterClass
092  public static void tearDownAfterClass() throws Exception {
093    TEST_UTIL.shutdownMiniCluster();
094  }
095
096  @Before
097  public void setup() throws IOException {
098    conf = TEST_UTIL.getConfiguration();
099    rootDir = TEST_UTIL.getDefaultRootDirPath();
100    fs = TEST_UTIL.getTestFileSystem();
101    htd = TEST_UTIL.getAdmin().getDescriptor(TableName.META_TABLE_NAME);
102    hri = TEST_UTIL.getAdmin().getRegions(TableName.META_TABLE_NAME).get(0);
103    scan = new Scan();
104  }
105
106  @Test
107  public void testDefaultBlockCache() throws IOException {
108    Configuration copyConf = new Configuration(conf);
109    ClientSideRegionScanner clientSideRegionScanner =
110      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);
111
112    BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
113    assertNotNull(blockCache);
114    assertTrue(blockCache instanceof IndexOnlyLruBlockCache);
115    assertTrue(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT
116        == blockCache.getMaxSize());
117  }
118
119  @Test
120  public void testConfiguredBlockCache() throws IOException {
121    Configuration copyConf = new Configuration(conf);
122    // tiny 1MB fixed cache size
123    long blockCacheFixedSize = 1024 * 1024L;
124    copyConf.setLong(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY, blockCacheFixedSize);
125    ClientSideRegionScanner clientSideRegionScanner =
126      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);
127
128    BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
129    assertNotNull(blockCache);
130    assertTrue(blockCache instanceof IndexOnlyLruBlockCache);
131    assertTrue(blockCacheFixedSize == blockCache.getMaxSize());
132  }
133
134  @Test
135  public void testNoBlockCache() throws IOException {
136    Configuration copyConf = new Configuration(conf);
137    copyConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
138    ClientSideRegionScanner clientSideRegionScanner =
139      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);
140
141    BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
142    assertNull(blockCache);
143  }
144
145  @Test
146  public void testContinuesToScanIfHasMore() throws IOException {
147    // Conditions for this test to set up RegionScannerImpl to bail on the scan
148    // after a single iteration
149    // 1. Configure preadMaxBytes to something small to trigger scannerContext#returnImmediately
150    // 2. Configure a filter to filter out some rows, in this case rows with values < 5
151    // 3. Configure the filter's hasFilterRow to return true so RegionScannerImpl sets
152    // the limitScope to something with a depth of 0, so we bail on the scan after the first
153    // iteration
154
155    Configuration copyConf = new Configuration(conf);
156    copyConf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1);
157    Scan scan = new Scan();
158    scan.setFilter(new FiltersRowsLessThan5());
159    scan.setLimit(1);
160
161    try (Table table = TEST_UTIL.createTable(TABLE_NAME, FAM_NAME)) {
162      TableDescriptor htd = TEST_UTIL.getAdmin().getDescriptor(TABLE_NAME);
163      RegionInfo hri = TEST_UTIL.getAdmin().getRegions(TABLE_NAME).get(0);
164
165      for (int i = 0; i < 10; ++i) {
166        table.put(createPut(i));
167      }
168
169      // Flush contents to disk so we can scan the fs
170      TEST_UTIL.getAdmin().flush(TABLE_NAME);
171
172      ClientSideRegionScanner clientSideRegionScanner =
173        new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);
174      RegionScanner scannerSpy = spy(clientSideRegionScanner.scanner);
175      clientSideRegionScanner.scanner = scannerSpy;
176      Result result = clientSideRegionScanner.next();
177
178      verify(scannerSpy, times(6)).nextRaw(anyList());
179      assertNotNull(result);
180      assertEquals(Bytes.toInt(result.getRow()), 5);
181      assertTrue(clientSideRegionScanner.hasMore);
182
183      for (int i = 6; i < 10; ++i) {
184        result = clientSideRegionScanner.next();
185        verify(scannerSpy, times(i + 1)).nextRaw(anyList());
186        assertNotNull(result);
187        assertEquals(Bytes.toInt(result.getRow()), i);
188      }
189
190      result = clientSideRegionScanner.next();
191      assertNull(result);
192      assertFalse(clientSideRegionScanner.hasMore);
193    }
194  }
195
196  @Test
197  public void testScanMetricsDisabled() throws IOException {
198    Configuration copyConf = new Configuration(conf);
199    Scan scan = new Scan();
200    try (ClientSideRegionScanner clientSideRegionScanner =
201      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) {
202      clientSideRegionScanner.next();
203      assertNull(clientSideRegionScanner.getScanMetrics());
204    }
205  }
206
207  private void testScanMetricsWithScanMetricsByRegionDisabled(ScanMetrics scanMetrics)
208    throws IOException {
209    Configuration copyConf = new Configuration(conf);
210    Scan scan = new Scan();
211    scan.setScanMetricsEnabled(true);
212    TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
213    try (ClientSideRegionScanner clientSideRegionScanner =
214      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, scanMetrics)) {
215      clientSideRegionScanner.next();
216      ScanMetrics scanMetricsFromScanner = clientSideRegionScanner.getScanMetrics();
217      assertNotNull(scanMetricsFromScanner);
218      if (scanMetrics != null) {
219        Assert.assertSame(scanMetrics, scanMetricsFromScanner);
220      }
221      Map<String, Long> metricsMap = scanMetricsFromScanner.getMetricsMap(false);
222      Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0);
223      Assert.assertTrue(scanMetricsFromScanner.collectMetricsByRegion(false).isEmpty());
224    }
225  }
226
227  @Test
228  public void testScanMetricsNotAsInputWithScanMetricsByRegionDisabled() throws IOException {
229    testScanMetricsWithScanMetricsByRegionDisabled(null);
230  }
231
232  @Test
233  public void testScanMetricsAsInputWithScanMetricsByRegionDisabled() throws IOException {
234    testScanMetricsWithScanMetricsByRegionDisabled(new ScanMetrics());
235  }
236
237  private void testScanMetricByRegion(ScanMetrics scanMetrics) throws IOException {
238    Configuration copyConf = new Configuration(conf);
239    Scan scan = new Scan();
240    scan.setEnableScanMetricsByRegion(true);
241    TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
242    try (ClientSideRegionScanner clientSideRegionScanner =
243      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, scanMetrics)) {
244      clientSideRegionScanner.next();
245      ScanMetrics scanMetricsFromScanner = clientSideRegionScanner.getScanMetrics();
246      assertNotNull(scanMetricsFromScanner);
247      if (scanMetrics != null) {
248        Assert.assertSame(scanMetrics, scanMetricsFromScanner);
249      }
250      Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
251        scanMetricsFromScanner.collectMetricsByRegion();
252      Assert.assertEquals(1, scanMetricsByRegion.size());
253      for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
254        .entrySet()) {
255        ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
256        Map<String, Long> metricsMap = entry.getValue();
257        Assert.assertEquals(hri.getEncodedName(), scanMetricsRegionInfo.getEncodedRegionName());
258        Assert.assertNull(scanMetricsRegionInfo.getServerName());
259        Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0);
260        Assert.assertEquals((long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME),
261          scanMetricsFromScanner.countOfRowsScanned.get());
262      }
263    }
264  }
265
266  @Test
267  public void testScanMetricsByRegionWithoutScanMetricsAsInput() throws IOException {
268    testScanMetricByRegion(null);
269  }
270
271  @Test
272  public void testScanMetricsByRegionWithScanMetricsAsInput() throws IOException {
273    testScanMetricByRegion(new ScanMetrics());
274  }
275
276  @Test
277  public void testGetFilesRead() throws Exception {
278    // Create a table and add some data
279    TableName tableName = TableName.valueOf(name.getMethodName());
280    try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAME })) {
281      TableDescriptor tableHtd = TEST_UTIL.getAdmin().getDescriptor(tableName);
282      RegionInfo tableHri = TEST_UTIL.getAdmin().getRegions(tableName).get(0);
283
284      // Add some data
285      for (int i = 0; i < 5; i++) {
286        byte[] row = Bytes.toBytes(i);
287        Put put = new Put(row);
288        put.addColumn(FAM_NAME, row, row);
289        table.put(put);
290      }
291
292      // Flush contents to disk so we can scan the fs
293      TEST_UTIL.getAdmin().flush(tableName);
294
295      // Create ClientSideRegionScanner with the correct table descriptor and region info
296      Configuration copyConf = new Configuration(conf);
297      Scan tableScan = new Scan();
298      ClientSideRegionScanner clientSideRegionScanner =
299        new ClientSideRegionScanner(copyConf, fs, rootDir, tableHtd, tableHri, tableScan, null);
300
301      // Get expected file paths from the region before closing
302      // (after closing, the region will be closed too)
303      Set<Path> expectedFilePaths = new HashSet<>();
304      HStore store = clientSideRegionScanner.getRegion().getStore(FAM_NAME);
305      for (HStoreFile storeFile : store.getStorefiles()) {
306        Path qualifiedPath = fs.makeQualified(storeFile.getPath());
307        expectedFilePaths.add(qualifiedPath);
308      }
309      int expectedFileCount = expectedFilePaths.size();
310      assertTrue("Should have at least one store file after flush", expectedFileCount >= 1);
311
312      // Before closing, should return empty set
313      Set<Path> filesReadBeforeClose = clientSideRegionScanner.getFilesRead();
314      assertTrue("Should return empty set before closing", filesReadBeforeClose.isEmpty());
315
316      // Scan through some results
317      Result result;
318      int count = 0;
319      while ((result = clientSideRegionScanner.next()) != null && count < 3) {
320        assertNotNull("Result should not be null", result);
321        count++;
322      }
323
324      // Still should return empty set before closing
325      filesReadBeforeClose = clientSideRegionScanner.getFilesRead();
326      assertTrue("Should return empty set before closing even after scanning",
327        filesReadBeforeClose.isEmpty());
328
329      // Close the scanner - this should collect files from the underlying scanner
330      clientSideRegionScanner.close();
331
332      // After closing, should return files from the underlying scanner
333      Set<Path> filesReadAfterClose = clientSideRegionScanner.getFilesRead();
334      // Verify exact file count
335      assertEquals("Should have exact file count after closing", expectedFileCount,
336        filesReadAfterClose.size());
337      // Verify exact file names match
338      assertEquals("Should contain all expected file paths", expectedFilePaths,
339        filesReadAfterClose);
340    } finally {
341      TEST_UTIL.deleteTable(tableName);
342    }
343  }
344
345  private static Put createPut(int rowAsInt) {
346    byte[] row = Bytes.toBytes(rowAsInt);
347    Put put = new Put(row);
348    put.addColumn(FAM_NAME, row, row);
349    return put;
350  }
351
352  private static class FiltersRowsLessThan5 extends FilterBase {
353
354    @Override
355    public boolean filterRowKey(Cell cell) {
356      byte[] rowKey = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(),
357        cell.getRowLength() + cell.getRowOffset());
358      int intValue = Bytes.toInt(rowKey);
359      return intValue < 5;
360    }
361
362    @Override
363    public boolean hasFilterRow() {
364      return true;
365    }
366  }
367}