001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.concurrent.ThreadLocalRandom;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtility;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
036import org.apache.hadoop.hbase.filter.Filter;
037import org.apache.hadoop.hbase.filter.FilterBase;
038import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
039import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
040import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
041import org.apache.hadoop.hbase.testclassification.RegionServerTests;
042import org.apache.hadoop.hbase.testclassification.SmallTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.junit.After;
045import org.junit.Before;
046import org.junit.ClassRule;
047import org.junit.Ignore;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051@Category({ RegionServerTests.class, SmallTests.class })
052public class TestSwitchToStreamRead {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestSwitchToStreamRead.class);
057
058  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
059
060  private static TableName TABLE_NAME = TableName.valueOf("stream");
061
062  private static byte[] FAMILY = Bytes.toBytes("cf");
063
064  private static byte[] QUAL = Bytes.toBytes("cq");
065
066  private static String VALUE_PREFIX;
067
068  private static HRegion REGION;
069
070  @Before
071  public void setUp() throws IOException {
072    UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
073    StringBuilder sb = new StringBuilder(256);
074    for (int i = 0; i < 255; i++) {
075      sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1));
076    }
077    VALUE_PREFIX = sb.append("-").toString();
078    REGION = UTIL.createLocalHRegion(
079      TableDescriptorBuilder.newBuilder(TABLE_NAME)
080        .setColumnFamily(
081          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
082        .build(),
083      null, null);
084    for (int i = 0; i < 900; i++) {
085      REGION
086        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
087    }
088    REGION.flush(true);
089    for (int i = 900; i < 1000; i++) {
090      REGION
091        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
092    }
093  }
094
095  @After
096  public void tearDown() throws IOException {
097    REGION.close(true);
098    UTIL.cleanupTestDir();
099  }
100
101  @Test
102  public void test() throws IOException {
103    try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
104      StoreScanner storeScanner =
105        (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
106      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
107        if (kvs instanceof StoreFileScanner) {
108          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
109          // starting from pread so we use shared reader here.
110          assertTrue(sfScanner.getReader().getReaderContext()
111            .getReaderType() == ReaderType.PREAD);
112        }
113      }
114      List<Cell> cells = new ArrayList<>();
115      for (int i = 0; i < 500; i++) {
116        assertTrue(scanner.next(cells));
117        Result result = Result.create(cells);
118        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
119        cells.clear();
120        scanner.shipped();
121      }
122      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
123        if (kvs instanceof StoreFileScanner) {
124          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
125          // we should have convert to use stream read now.
126          assertFalse(sfScanner.getReader().getReaderContext()
127            .getReaderType() == ReaderType.PREAD);
128        }
129      }
130      for (int i = 500; i < 1000; i++) {
131        assertEquals(i != 999, scanner.next(cells));
132        Result result = Result.create(cells);
133        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
134        cells.clear();
135        scanner.shipped();
136      }
137    }
138    // make sure all scanners are closed.
139    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
140      assertFalse(sf.isReferencedInReads());
141    }
142  }
143
144  public static final class MatchLastRowKeyFilter extends FilterBase {
145
146    @Override
147    public boolean filterRowKey(Cell cell) throws IOException {
148      return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999;
149    }
150  }
151
152  private void testFilter(Filter filter) throws IOException {
153    try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) {
154      StoreScanner storeScanner =
155        (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
156      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
157        if (kvs instanceof StoreFileScanner) {
158          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
159          // starting from pread so we use shared reader here.
160          assertTrue(sfScanner.getReader().getReaderContext()
161            .getReaderType() == ReaderType.PREAD);
162        }
163      }
164      List<Cell> cells = new ArrayList<>();
165      // should return before finishing the scan as we want to switch from pread to stream
166      assertTrue(scanner.next(cells,
167        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
168      assertTrue(cells.isEmpty());
169      scanner.shipped();
170
171      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
172        if (kvs instanceof StoreFileScanner) {
173          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
174          // we should have convert to use stream read now.
175          assertFalse(sfScanner.getReader().getReaderContext()
176            .getReaderType() == ReaderType.PREAD);
177        }
178      }
179      assertFalse(scanner.next(cells,
180        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
181      Result result = Result.create(cells);
182      assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL)));
183      cells.clear();
184      scanner.shipped();
185    }
186    // make sure all scanners are closed.
187    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
188      assertFalse(sf.isReferencedInReads());
189    }
190  }
191
192  // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next
193  // until the row key is changed. And there we can only use NoLimitScannerContext so we can not
194  // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to
195  // an infinite loop. Need to dig more, the code are way too complicated...
196  @Ignore
197  @Test
198  public void testFilterRowKey() throws IOException {
199    testFilter(new MatchLastRowKeyFilter());
200  }
201
202  public static final class MatchLastRowCellNextColFilter extends FilterBase {
203
204    @Override
205    public ReturnCode filterCell(Cell c) throws IOException {
206      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
207        return ReturnCode.INCLUDE;
208      } else {
209        return ReturnCode.NEXT_COL;
210      }
211    }
212  }
213
214  @Test
215  public void testFilterCellNextCol() throws IOException {
216    testFilter(new MatchLastRowCellNextColFilter());
217  }
218
219  public static final class MatchLastRowCellNextRowFilter extends FilterBase {
220
221    @Override
222    public ReturnCode filterCell(Cell c) throws IOException {
223      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
224        return ReturnCode.INCLUDE;
225      } else {
226        return ReturnCode.NEXT_ROW;
227      }
228    }
229  }
230
231  @Test
232  public void testFilterCellNextRow() throws IOException {
233    testFilter(new MatchLastRowCellNextRowFilter());
234  }
235
236  public static final class MatchLastRowFilterRowFilter extends FilterBase {
237
238    private boolean exclude;
239
240    @Override
241    public void filterRowCells(List<Cell> kvs) throws IOException {
242      Cell c = kvs.get(0);
243      exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999;
244    }
245
246    @Override
247    public void reset() throws IOException {
248      exclude = false;
249    }
250
251    @Override
252    public boolean filterRow() throws IOException {
253      return exclude;
254    }
255
256    @Override
257    public boolean hasFilterRow() {
258      return true;
259    }
260  }
261
262  @Test
263  public void testFilterRow() throws IOException {
264    testFilter(new MatchLastRowFilterRowFilter());
265  }
266}