001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertSame;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.ThreadLocalRandom;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseTestingUtil;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.filter.Filter;
043import org.apache.hadoop.hbase.filter.FilterBase;
044import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
045import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
046import org.apache.hadoop.hbase.testclassification.RegionServerTests;
047import org.apache.hadoop.hbase.testclassification.SmallTests;
048import org.apache.hadoop.hbase.util.Bytes;
049import org.junit.jupiter.api.AfterEach;
050import org.junit.jupiter.api.BeforeEach;
051import org.junit.jupiter.api.Disabled;
052import org.junit.jupiter.api.Tag;
053import org.junit.jupiter.api.Test;
054
055@Tag(RegionServerTests.TAG)
056@Tag(SmallTests.TAG)
057public class TestSwitchToStreamRead {
058
059  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
060
061  private static TableName TABLE_NAME = TableName.valueOf("stream");
062
063  private static byte[] FAMILY = Bytes.toBytes("cf");
064
065  private static byte[] QUAL = Bytes.toBytes("cq");
066
067  private static String VALUE_PREFIX;
068
069  private static HRegion REGION;
070
071  @BeforeEach
072  public void setUp() throws IOException {
073    UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
074    StringBuilder sb = new StringBuilder(256);
075    for (int i = 0; i < 255; i++) {
076      sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1));
077    }
078    VALUE_PREFIX = sb.append("-").toString();
079    REGION = UTIL.createLocalHRegion(
080      TableDescriptorBuilder.newBuilder(TABLE_NAME)
081        .setColumnFamily(
082          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
083        .build(),
084      null, null);
085    for (int i = 0; i < 900; i++) {
086      REGION
087        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
088    }
089    REGION.flush(true);
090    for (int i = 900; i < 1000; i++) {
091      REGION
092        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
093    }
094  }
095
096  @AfterEach
097  public void tearDown() throws IOException {
098    REGION.close(true);
099    UTIL.cleanupTestDir();
100  }
101
102  @Test
103  public void test() throws IOException {
104    try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
105      StoreScanner storeScanner = (StoreScanner) scanner.storeHeap.getCurrentForTesting();
106      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
107        if (kvs instanceof StoreFileScanner) {
108          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
109          // starting from pread so we use shared reader here.
110          assertTrue(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD);
111        }
112      }
113      List<Cell> cells = new ArrayList<>();
114      for (int i = 0; i < 500; i++) {
115        assertTrue(scanner.next(cells));
116        Result result = Result.create(cells);
117        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
118        cells.clear();
119        scanner.shipped();
120      }
121      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
122        if (kvs instanceof StoreFileScanner) {
123          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
124          // we should have convert to use stream read now.
125          assertFalse(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD);
126        }
127      }
128      for (int i = 500; i < 1000; i++) {
129        assertEquals(i != 999, scanner.next(cells));
130        Result result = Result.create(cells);
131        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
132        cells.clear();
133        scanner.shipped();
134      }
135    }
136    // make sure all scanners are closed.
137    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
138      assertFalse(sf.isReferencedInReads());
139    }
140  }
141
142  public static final class MatchLastRowKeyFilter extends FilterBase {
143
144    @Override
145    public boolean filterRowKey(Cell cell) throws IOException {
146      return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999;
147    }
148  }
149
150  private void testFilter(Filter filter) throws IOException {
151    try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) {
152      StoreScanner storeScanner = (StoreScanner) scanner.storeHeap.getCurrentForTesting();
153      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
154        if (kvs instanceof StoreFileScanner) {
155          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
156          // starting from pread so we use shared reader here.
157          assertTrue(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD);
158        }
159      }
160      List<Cell> cells = new ArrayList<>();
161      // should return before finishing the scan as we want to switch from pread to stream
162      assertTrue(scanner.next(cells,
163        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
164      assertTrue(cells.isEmpty());
165      scanner.shipped();
166
167      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
168        if (kvs instanceof StoreFileScanner) {
169          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
170          // we should have convert to use stream read now.
171          assertFalse(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD);
172        }
173      }
174      assertFalse(scanner.next(cells,
175        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
176      Result result = Result.create(cells);
177      assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL)));
178      cells.clear();
179      scanner.shipped();
180    }
181    // make sure all scanners are closed.
182    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
183      assertFalse(sf.isReferencedInReads());
184    }
185  }
186
187  // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next
188  // until the row key is changed. And there we can only use NoLimitScannerContext so we can not
189  // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to
190  // an infinite loop. Need to dig more, the code are way too complicated...
191  @Disabled
192  @Test
193  public void testFilterRowKey() throws IOException {
194    testFilter(new MatchLastRowKeyFilter());
195  }
196
197  public static final class MatchLastRowCellNextColFilter extends FilterBase {
198
199    @Override
200    public ReturnCode filterCell(Cell c) throws IOException {
201      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
202        return ReturnCode.INCLUDE;
203      } else {
204        return ReturnCode.NEXT_COL;
205      }
206    }
207  }
208
209  @Test
210  public void testFilterCellNextCol() throws IOException {
211    testFilter(new MatchLastRowCellNextColFilter());
212  }
213
214  public static final class MatchLastRowCellNextRowFilter extends FilterBase {
215
216    @Override
217    public ReturnCode filterCell(Cell c) throws IOException {
218      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
219        return ReturnCode.INCLUDE;
220      } else {
221        return ReturnCode.NEXT_ROW;
222      }
223    }
224  }
225
226  @Test
227  public void testFilterCellNextRow() throws IOException {
228    testFilter(new MatchLastRowCellNextRowFilter());
229  }
230
231  public static final class MatchLastRowFilterRowFilter extends FilterBase {
232
233    private boolean exclude;
234
235    @Override
236    public void filterRowCells(List<Cell> kvs) throws IOException {
237      Cell c = kvs.get(0);
238      exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999;
239    }
240
241    @Override
242    public void reset() throws IOException {
243      exclude = false;
244    }
245
246    @Override
247    public boolean filterRow() throws IOException {
248      return exclude;
249    }
250
251    @Override
252    public boolean hasFilterRow() {
253      return true;
254    }
255  }
256
257  @Test
258  public void testFilterRow() throws IOException {
259    testFilter(new MatchLastRowFilterRowFilter());
260  }
261
262  /**
263   * Verifies that when the store scanner switches from pread to stream read successfully, all store
264   * files that were read (including those closed during the switch) are reported by
265   * {@link StoreScanner#getFilesRead()} after close.
266   */
267  @Test
268  public void testGetFilesReadOnTrySwitchToStreamRead() throws Exception {
269    HStore store = REGION.getStore(FAMILY);
270    FileSystem fs = REGION.getFilesystem();
271
272    // Set a very small preadMaxBytes so that trySwitchToStreamRead is triggered during scan.
273    long originalPreadMaxBytes =
274      UTIL.getConfiguration().getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
275    try {
276      UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 10L);
277
278      ScanInfo scanInfo =
279        new ScanInfo(UTIL.getConfiguration(), FAMILY, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
280          org.apache.hadoop.hbase.KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0,
281          org.apache.hadoop.hbase.CellComparator.getInstance(), false);
282      Scan scan = new Scan().setReadType(Scan.ReadType.DEFAULT);
283      long readPt =
284        REGION.getReadPoint(org.apache.hadoop.hbase.client.IsolationLevel.READ_COMMITTED);
285
286      StoreScanner storeScanner = new StoreScanner(store, scanInfo, scan, null, readPt);
287
288      // Collect expected store file paths (qualified) for assertion after close.
289      Set<Path> expectedFilePaths = new HashSet<>();
290      for (HStoreFile sf : store.getStorefiles()) {
291        expectedFilePaths.add(fs.makeQualified(sf.getPath()));
292      }
293      assertFalse(expectedFilePaths.isEmpty(), "Should have at least one store file");
294
295      // Verify scanners start in PREAD mode before the switch.
296      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
297        if (kvs instanceof StoreFileScanner) {
298          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
299          assertSame(ReaderType.PREAD, sfScanner.getReader().getReaderContext().getReaderType(),
300            "Scanner should start in PREAD mode");
301        }
302      }
303
304      // Scan a few rows and call shipped() to trigger trySwitchToStreamRead.
305      List<Cell> results = new ArrayList<>();
306      ScannerContext scannerContext = ScannerContext.newBuilder().build();
307      boolean switchVerified = false;
308      while (storeScanner.next(results, scannerContext)) {
309        results.clear();
310        storeScanner.shipped();
311
312        // Check mid-scan, whether the switch happened.
313        if (!switchVerified) {
314          boolean allSwitched = true;
315          for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
316            if (kvs instanceof StoreFileScanner) {
317              StoreFileScanner sfScanner = (StoreFileScanner) kvs;
318              if (sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD) {
319                allSwitched = false;
320                break;
321              }
322            }
323          }
324          if (allSwitched) {
325            switchVerified = true;
326          }
327        }
328      }
329      assertTrue(switchVerified,
330        "trySwitchToStreamRead should have been invoked and scanners switched to stream");
331
332      // Not closing the scanners explicitly, because those must be closed during
333      // trySwitchToStreamRead
334
335      // After close: files that were read (including those closed during switch) must be tracked.
336      Set<Path> filesRead = storeScanner.getFilesRead();
337      assertEquals(expectedFilePaths.size(), filesRead.size(),
338        "Should have exact file count after close");
339      assertEquals(expectedFilePaths, filesRead, "Should contain all expected store file paths");
340    } finally {
341      UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES,
342        originalPreadMaxBytes);
343    }
344  }
345}