001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertSame;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.ThreadLocalRandom;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Result;
041import org.apache.hadoop.hbase.client.Scan;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.filter.Filter;
044import org.apache.hadoop.hbase.filter.FilterBase;
045import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
046import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
047import org.apache.hadoop.hbase.testclassification.RegionServerTests;
048import org.apache.hadoop.hbase.testclassification.SmallTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.junit.After;
051import org.junit.Before;
052import org.junit.ClassRule;
053import org.junit.Ignore;
054import org.junit.Test;
055import org.junit.experimental.categories.Category;
056
057@Category({ RegionServerTests.class, SmallTests.class })
058public class TestSwitchToStreamRead {
059
060  @ClassRule
061  public static final HBaseClassTestRule CLASS_RULE =
062    HBaseClassTestRule.forClass(TestSwitchToStreamRead.class);
063
064  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
065
066  private static TableName TABLE_NAME = TableName.valueOf("stream");
067
068  private static byte[] FAMILY = Bytes.toBytes("cf");
069
070  private static byte[] QUAL = Bytes.toBytes("cq");
071
072  private static String VALUE_PREFIX;
073
074  private static HRegion REGION;
075
076  @Before
077  public void setUp() throws IOException {
078    UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
079    StringBuilder sb = new StringBuilder(256);
080    for (int i = 0; i < 255; i++) {
081      sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1));
082    }
083    VALUE_PREFIX = sb.append("-").toString();
084    REGION = UTIL.createLocalHRegion(
085      TableDescriptorBuilder.newBuilder(TABLE_NAME)
086        .setColumnFamily(
087          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
088        .build(),
089      null, null);
090    for (int i = 0; i < 900; i++) {
091      REGION
092        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
093    }
094    REGION.flush(true);
095    for (int i = 900; i < 1000; i++) {
096      REGION
097        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
098    }
099  }
100
101  @After
102  public void tearDown() throws IOException {
103    REGION.close(true);
104    UTIL.cleanupTestDir();
105  }
106
107  @Test
108  public void test() throws IOException {
109    try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
110      StoreScanner storeScanner = (StoreScanner) scanner.storeHeap.getCurrentForTesting();
111      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
112        if (kvs instanceof StoreFileScanner) {
113          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
114          // starting from pread so we use shared reader here.
115          assertTrue(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD);
116        }
117      }
118      List<Cell> cells = new ArrayList<>();
119      for (int i = 0; i < 500; i++) {
120        assertTrue(scanner.next(cells));
121        Result result = Result.create(cells);
122        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
123        cells.clear();
124        scanner.shipped();
125      }
126      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
127        if (kvs instanceof StoreFileScanner) {
128          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
129          // we should have convert to use stream read now.
130          assertFalse(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD);
131        }
132      }
133      for (int i = 500; i < 1000; i++) {
134        assertEquals(i != 999, scanner.next(cells));
135        Result result = Result.create(cells);
136        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
137        cells.clear();
138        scanner.shipped();
139      }
140    }
141    // make sure all scanners are closed.
142    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
143      assertFalse(sf.isReferencedInReads());
144    }
145  }
146
147  public static final class MatchLastRowKeyFilter extends FilterBase {
148
149    @Override
150    public boolean filterRowKey(Cell cell) throws IOException {
151      return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999;
152    }
153  }
154
155  private void testFilter(Filter filter) throws IOException {
156    try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) {
157      StoreScanner storeScanner = (StoreScanner) scanner.storeHeap.getCurrentForTesting();
158      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
159        if (kvs instanceof StoreFileScanner) {
160          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
161          // starting from pread so we use shared reader here.
162          assertTrue(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD);
163        }
164      }
165      List<Cell> cells = new ArrayList<>();
166      // should return before finishing the scan as we want to switch from pread to stream
167      assertTrue(scanner.next(cells,
168        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
169      assertTrue(cells.isEmpty());
170      scanner.shipped();
171
172      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
173        if (kvs instanceof StoreFileScanner) {
174          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
175          // we should have convert to use stream read now.
176          assertFalse(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD);
177        }
178      }
179      assertFalse(scanner.next(cells,
180        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
181      Result result = Result.create(cells);
182      assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL)));
183      cells.clear();
184      scanner.shipped();
185    }
186    // make sure all scanners are closed.
187    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
188      assertFalse(sf.isReferencedInReads());
189    }
190  }
191
192  // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next
193  // until the row key is changed. And there we can only use NoLimitScannerContext so we can not
194  // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to
195  // an infinite loop. Need to dig more, the code are way too complicated...
196  @Ignore
197  @Test
198  public void testFilterRowKey() throws IOException {
199    testFilter(new MatchLastRowKeyFilter());
200  }
201
202  public static final class MatchLastRowCellNextColFilter extends FilterBase {
203
204    @Override
205    public ReturnCode filterCell(Cell c) throws IOException {
206      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
207        return ReturnCode.INCLUDE;
208      } else {
209        return ReturnCode.NEXT_COL;
210      }
211    }
212  }
213
214  @Test
215  public void testFilterCellNextCol() throws IOException {
216    testFilter(new MatchLastRowCellNextColFilter());
217  }
218
219  public static final class MatchLastRowCellNextRowFilter extends FilterBase {
220
221    @Override
222    public ReturnCode filterCell(Cell c) throws IOException {
223      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
224        return ReturnCode.INCLUDE;
225      } else {
226        return ReturnCode.NEXT_ROW;
227      }
228    }
229  }
230
231  @Test
232  public void testFilterCellNextRow() throws IOException {
233    testFilter(new MatchLastRowCellNextRowFilter());
234  }
235
236  public static final class MatchLastRowFilterRowFilter extends FilterBase {
237
238    private boolean exclude;
239
240    @Override
241    public void filterRowCells(List<Cell> kvs) throws IOException {
242      Cell c = kvs.get(0);
243      exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999;
244    }
245
246    @Override
247    public void reset() throws IOException {
248      exclude = false;
249    }
250
251    @Override
252    public boolean filterRow() throws IOException {
253      return exclude;
254    }
255
256    @Override
257    public boolean hasFilterRow() {
258      return true;
259    }
260  }
261
262  @Test
263  public void testFilterRow() throws IOException {
264    testFilter(new MatchLastRowFilterRowFilter());
265  }
266
267  /**
268   * Verifies that when the store scanner switches from pread to stream read successfully, all store
269   * files that were read (including those closed during the switch) are reported by
270   * {@link StoreScanner#getFilesRead()} after close.
271   */
272  @Test
273  public void testGetFilesReadOnTrySwitchToStreamRead() throws Exception {
274    HStore store = REGION.getStore(FAMILY);
275    FileSystem fs = REGION.getFilesystem();
276
277    // Set a very small preadMaxBytes so that trySwitchToStreamRead is triggered during scan.
278    long originalPreadMaxBytes =
279      UTIL.getConfiguration().getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
280    try {
281      UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 10L);
282
283      ScanInfo scanInfo =
284        new ScanInfo(UTIL.getConfiguration(), FAMILY, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
285          org.apache.hadoop.hbase.KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0,
286          org.apache.hadoop.hbase.CellComparator.getInstance(), false);
287      Scan scan = new Scan().setReadType(Scan.ReadType.DEFAULT);
288      long readPt =
289        REGION.getReadPoint(org.apache.hadoop.hbase.client.IsolationLevel.READ_COMMITTED);
290
291      StoreScanner storeScanner = new StoreScanner(store, scanInfo, scan, null, readPt);
292
293      // Collect expected store file paths (qualified) for assertion after close.
294      Set<Path> expectedFilePaths = new HashSet<>();
295      for (HStoreFile sf : store.getStorefiles()) {
296        expectedFilePaths.add(fs.makeQualified(sf.getPath()));
297      }
298      assertFalse("Should have at least one store file", expectedFilePaths.isEmpty());
299
300      // Verify scanners start in PREAD mode before the switch.
301      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
302        if (kvs instanceof StoreFileScanner) {
303          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
304          assertSame("Scanner should start in PREAD mode", ReaderType.PREAD,
305            sfScanner.getReader().getReaderContext().getReaderType());
306        }
307      }
308
309      // Scan a few rows and call shipped() to trigger trySwitchToStreamRead.
310      List<Cell> results = new ArrayList<>();
311      ScannerContext scannerContext = ScannerContext.newBuilder().build();
312      boolean switchVerified = false;
313      while (storeScanner.next(results, scannerContext)) {
314        results.clear();
315        storeScanner.shipped();
316
317        // Check mid-scan, whether the switch happened.
318        if (!switchVerified) {
319          boolean allSwitched = true;
320          for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
321            if (kvs instanceof StoreFileScanner) {
322              StoreFileScanner sfScanner = (StoreFileScanner) kvs;
323              if (sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD) {
324                allSwitched = false;
325                break;
326              }
327            }
328          }
329          if (allSwitched) {
330            switchVerified = true;
331          }
332        }
333      }
334      assertTrue("trySwitchToStreamRead should have been invoked and scanners switched to stream",
335        switchVerified);
336
337      // Not closing the scanners explicitly, because those must be closed during
338      // trySwitchToStreamRead
339
340      // After close: files that were read (including those closed during switch) must be tracked.
341      Set<Path> filesRead = storeScanner.getFilesRead();
342      assertEquals("Should have exact file count after close", expectedFilePaths.size(),
343        filesRead.size());
344      assertEquals("Should contain all expected store file paths", expectedFilePaths, filesRead);
345    } finally {
346      UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES,
347        originalPreadMaxBytes);
348    }
349  }
350}