001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNull;
025import static org.junit.Assert.assertTrue;
026import static org.mockito.ArgumentMatchers.anyList;
027import static org.mockito.Mockito.spy;
028import static org.mockito.Mockito.times;
029import static org.mockito.Mockito.verify;
030
031import java.io.IOException;
032import java.util.Arrays;
033import java.util.Map;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
043import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
044import org.apache.hadoop.hbase.filter.FilterBase;
045import org.apache.hadoop.hbase.io.hfile.BlockCache;
046import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache;
047import org.apache.hadoop.hbase.regionserver.RegionScanner;
048import org.apache.hadoop.hbase.regionserver.StoreScanner;
049import org.apache.hadoop.hbase.testclassification.ClientTests;
050import org.apache.hadoop.hbase.testclassification.SmallTests;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.junit.AfterClass;
053import org.junit.Assert;
054import org.junit.Before;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059
060@Category({ SmallTests.class, ClientTests.class })
061public class TestClientSideRegionScanner {
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestClientSideRegionScanner.class);
065
066  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
067  private static final TableName TABLE_NAME = TableName.valueOf("test");
068  private static final byte[] FAM_NAME = Bytes.toBytes("f");
069
070  private Configuration conf;
071  private Path rootDir;
072  private FileSystem fs;
073  private TableDescriptor htd;
074  private RegionInfo hri;
075  private Scan scan;
076
077  @BeforeClass
078  public static void setUpBeforeClass() throws Exception {
079    TEST_UTIL.startMiniCluster(1);
080  }
081
082  @AfterClass
083  public static void tearDownAfterClass() throws Exception {
084    TEST_UTIL.shutdownMiniCluster();
085  }
086
087  @Before
088  public void setup() throws IOException {
089    conf = TEST_UTIL.getConfiguration();
090    rootDir = TEST_UTIL.getDefaultRootDirPath();
091    fs = TEST_UTIL.getTestFileSystem();
092    htd = TEST_UTIL.getAdmin().getDescriptor(TableName.META_TABLE_NAME);
093    hri = TEST_UTIL.getAdmin().getRegions(TableName.META_TABLE_NAME).get(0);
094    scan = new Scan();
095  }
096
097  @Test
098  public void testDefaultBlockCache() throws IOException {
099    Configuration copyConf = new Configuration(conf);
100    ClientSideRegionScanner clientSideRegionScanner =
101      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);
102
103    BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
104    assertNotNull(blockCache);
105    assertTrue(blockCache instanceof IndexOnlyLruBlockCache);
106    assertTrue(HConstants.HBASE_CLIENT_SCANNER_ONHEAP_BLOCK_CACHE_FIXED_SIZE_DEFAULT
107        == blockCache.getMaxSize());
108  }
109
110  @Test
111  public void testConfiguredBlockCache() throws IOException {
112    Configuration copyConf = new Configuration(conf);
113    // tiny 1MB fixed cache size
114    long blockCacheFixedSize = 1024 * 1024L;
115    copyConf.setLong(HConstants.HFILE_ONHEAP_BLOCK_CACHE_FIXED_SIZE_KEY, blockCacheFixedSize);
116    ClientSideRegionScanner clientSideRegionScanner =
117      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);
118
119    BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
120    assertNotNull(blockCache);
121    assertTrue(blockCache instanceof IndexOnlyLruBlockCache);
122    assertTrue(blockCacheFixedSize == blockCache.getMaxSize());
123  }
124
125  @Test
126  public void testNoBlockCache() throws IOException {
127    Configuration copyConf = new Configuration(conf);
128    copyConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
129    ClientSideRegionScanner clientSideRegionScanner =
130      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);
131
132    BlockCache blockCache = clientSideRegionScanner.getRegion().getBlockCache();
133    assertNull(blockCache);
134  }
135
136  @Test
137  public void testContinuesToScanIfHasMore() throws IOException {
138    // Conditions for this test to set up RegionScannerImpl to bail on the scan
139    // after a single iteration
140    // 1. Configure preadMaxBytes to something small to trigger scannerContext#returnImmediately
141    // 2. Configure a filter to filter out some rows, in this case rows with values < 5
142    // 3. Configure the filter's hasFilterRow to return true so RegionScannerImpl sets
143    // the limitScope to something with a depth of 0, so we bail on the scan after the first
144    // iteration
145
146    Configuration copyConf = new Configuration(conf);
147    copyConf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1);
148    Scan scan = new Scan();
149    scan.setFilter(new FiltersRowsLessThan5());
150    scan.setLimit(1);
151
152    try (Table table = TEST_UTIL.createTable(TABLE_NAME, FAM_NAME)) {
153      TableDescriptor htd = TEST_UTIL.getAdmin().getDescriptor(TABLE_NAME);
154      RegionInfo hri = TEST_UTIL.getAdmin().getRegions(TABLE_NAME).get(0);
155
156      for (int i = 0; i < 10; ++i) {
157        table.put(createPut(i));
158      }
159
160      // Flush contents to disk so we can scan the fs
161      TEST_UTIL.getAdmin().flush(TABLE_NAME);
162
163      ClientSideRegionScanner clientSideRegionScanner =
164        new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null);
165      RegionScanner scannerSpy = spy(clientSideRegionScanner.scanner);
166      clientSideRegionScanner.scanner = scannerSpy;
167      Result result = clientSideRegionScanner.next();
168
169      verify(scannerSpy, times(6)).nextRaw(anyList());
170      assertNotNull(result);
171      assertEquals(Bytes.toInt(result.getRow()), 5);
172      assertTrue(clientSideRegionScanner.hasMore);
173
174      for (int i = 6; i < 10; ++i) {
175        result = clientSideRegionScanner.next();
176        verify(scannerSpy, times(i + 1)).nextRaw(anyList());
177        assertNotNull(result);
178        assertEquals(Bytes.toInt(result.getRow()), i);
179      }
180
181      result = clientSideRegionScanner.next();
182      assertNull(result);
183      assertFalse(clientSideRegionScanner.hasMore);
184    }
185  }
186
187  @Test
188  public void testScanMetricsDisabled() throws IOException {
189    Configuration copyConf = new Configuration(conf);
190    Scan scan = new Scan();
191    try (ClientSideRegionScanner clientSideRegionScanner =
192      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, null)) {
193      clientSideRegionScanner.next();
194      assertNull(clientSideRegionScanner.getScanMetrics());
195    }
196  }
197
198  private void testScanMetricsWithScanMetricsByRegionDisabled(ScanMetrics scanMetrics)
199    throws IOException {
200    Configuration copyConf = new Configuration(conf);
201    Scan scan = new Scan();
202    scan.setScanMetricsEnabled(true);
203    TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
204    try (ClientSideRegionScanner clientSideRegionScanner =
205      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, scanMetrics)) {
206      clientSideRegionScanner.next();
207      ScanMetrics scanMetricsFromScanner = clientSideRegionScanner.getScanMetrics();
208      assertNotNull(scanMetricsFromScanner);
209      if (scanMetrics != null) {
210        Assert.assertSame(scanMetrics, scanMetricsFromScanner);
211      }
212      Map<String, Long> metricsMap = scanMetricsFromScanner.getMetricsMap(false);
213      Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0);
214      Assert.assertTrue(scanMetricsFromScanner.collectMetricsByRegion(false).isEmpty());
215    }
216  }
217
218  @Test
219  public void testScanMetricsNotAsInputWithScanMetricsByRegionDisabled() throws IOException {
220    testScanMetricsWithScanMetricsByRegionDisabled(null);
221  }
222
223  @Test
224  public void testScanMetricsAsInputWithScanMetricsByRegionDisabled() throws IOException {
225    testScanMetricsWithScanMetricsByRegionDisabled(new ScanMetrics());
226  }
227
228  private void testScanMetricByRegion(ScanMetrics scanMetrics) throws IOException {
229    Configuration copyConf = new Configuration(conf);
230    Scan scan = new Scan();
231    scan.setEnableScanMetricsByRegion(true);
232    TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
233    try (ClientSideRegionScanner clientSideRegionScanner =
234      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, scanMetrics)) {
235      clientSideRegionScanner.next();
236      ScanMetrics scanMetricsFromScanner = clientSideRegionScanner.getScanMetrics();
237      assertNotNull(scanMetricsFromScanner);
238      if (scanMetrics != null) {
239        Assert.assertSame(scanMetrics, scanMetricsFromScanner);
240      }
241      Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
242        scanMetricsFromScanner.collectMetricsByRegion();
243      Assert.assertEquals(1, scanMetricsByRegion.size());
244      for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : scanMetricsByRegion
245        .entrySet()) {
246        ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
247        Map<String, Long> metricsMap = entry.getValue();
248        Assert.assertEquals(hri.getEncodedName(), scanMetricsRegionInfo.getEncodedRegionName());
249        Assert.assertNull(scanMetricsRegionInfo.getServerName());
250        Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0);
251        Assert.assertEquals((long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME),
252          scanMetricsFromScanner.countOfRowsScanned.get());
253      }
254    }
255  }
256
257  @Test
258  public void testScanMetricsByRegionWithoutScanMetricsAsInput() throws IOException {
259    testScanMetricByRegion(null);
260  }
261
262  @Test
263  public void testScanMetricsByRegionWithScanMetricsAsInput() throws IOException {
264    testScanMetricByRegion(new ScanMetrics());
265  }
266
267  private static Put createPut(int rowAsInt) {
268    byte[] row = Bytes.toBytes(rowAsInt);
269    Put put = new Put(row);
270    put.addColumn(FAM_NAME, row, row);
271    return put;
272  }
273
274  private static class FiltersRowsLessThan5 extends FilterBase {
275
276    @Override
277    public boolean filterRowKey(Cell cell) {
278      byte[] rowKey = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(),
279        cell.getRowLength() + cell.getRowOffset());
280      int intValue = Bytes.toInt(rowKey);
281      return intValue < 5;
282    }
283
284    @Override
285    public boolean hasFilterRow() {
286      return true;
287    }
288  }
289}