002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.ThreadLocalRandom;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.Scan;
036import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
037import org.apache.hadoop.hbase.filter.Filter;
038import org.apache.hadoop.hbase.filter.FilterBase;
039import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
040import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
041import org.apache.hadoop.hbase.testclassification.RegionServerTests;
042import org.apache.hadoop.hbase.testclassification.SmallTests;
043import org.apache.hadoop.hbase.util.Bytes;
044import org.junit.After;
045import org.junit.Before;
046import org.junit.ClassRule;
047import org.junit.Ignore;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
051@Category({ RegionServerTests.class, SmallTests.class })
052public class TestSwitchToStreamRead {
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestSwitchToStreamRead.class);
058  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
060  private static TableName TABLE_NAME = TableName.valueOf("stream");
062  private static byte[] FAMILY = Bytes.toBytes("cf");
064  private static byte[] QUAL = Bytes.toBytes("cq");
066  private static String VALUE_PREFIX;
068  private static HRegion REGION;
070  @Before
071  public void setUp() throws IOException {
072    UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
073    StringBuilder sb = new StringBuilder(256);
074    for (int i = 0; i < 255; i++) {
075      sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1));
076    }
077    VALUE_PREFIX = sb.append("-").toString();
078    REGION = UTIL.createLocalHRegion(
079      TableDescriptorBuilder.newBuilder(TABLE_NAME)
080        .setColumnFamily(
081          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
082        .build(),
083      null, null);
084    for (int i = 0; i < 900; i++) {
085      REGION
086        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
087    }
088    REGION.flush(true);
089    for (int i = 900; i < 1000; i++) {
090      REGION
091        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
092    }
093  }
095  @After
096  public void tearDown() throws IOException {
097    REGION.close(true);
098    UTIL.cleanupTestDir();
099  }
101  @Test
102  public void test() throws IOException {
103    try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
104      StoreScanner storeScanner = (StoreScanner) scanner.storeHeap.getCurrentForTesting();
105      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
106        if (kvs instanceof StoreFileScanner) {
107          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
108          // starting from pread so we use shared reader here.
109          assertTrue(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD);
110        }
111      }
112      List<Cell> cells = new ArrayList<>();
113      for (int i = 0; i < 500; i++) {
114        assertTrue(scanner.next(cells));
115        Result result = Result.create(cells);
116        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
117        cells.clear();
118        scanner.shipped();
119      }
120      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
121        if (kvs instanceof StoreFileScanner) {
122          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
123          // we should have convert to use stream read now.
124          assertFalse(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD);
125        }
126      }
127      for (int i = 500; i < 1000; i++) {
128        assertEquals(i != 999, scanner.next(cells));
129        Result result = Result.create(cells);
130        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
131        cells.clear();
132        scanner.shipped();
133      }
134    }
135    // make sure all scanners are closed.
136    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
137      assertFalse(sf.isReferencedInReads());
138    }
139  }
141  public static final class MatchLastRowKeyFilter extends FilterBase {
143    @Override
144    public boolean filterRowKey(Cell cell) throws IOException {
145      return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999;
146    }
147  }
149  private void testFilter(Filter filter) throws IOException {
150    try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) {
151      StoreScanner storeScanner = (StoreScanner) scanner.storeHeap.getCurrentForTesting();
152      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
153        if (kvs instanceof StoreFileScanner) {
154          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
155          // starting from pread so we use shared reader here.
156          assertTrue(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD);
157        }
158      }
159      List<Cell> cells = new ArrayList<>();
160      // should return before finishing the scan as we want to switch from pread to stream
161      assertTrue(scanner.next(cells,
162        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
163      assertTrue(cells.isEmpty());
164      scanner.shipped();
166      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
167        if (kvs instanceof StoreFileScanner) {
168          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
169          // we should have convert to use stream read now.
170          assertFalse(sfScanner.getReader().getReaderContext().getReaderType() == ReaderType.PREAD);
171        }
172      }
173      assertFalse(scanner.next(cells,
174        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
175      Result result = Result.create(cells);
176      assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL)));
177      cells.clear();
178      scanner.shipped();
179    }
180    // make sure all scanners are closed.
181    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
182      assertFalse(sf.isReferencedInReads());
183    }
184  }
186  // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next
187  // until the row key is changed. And there we can only use NoLimitScannerContext so we can not
188  // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to
189  // an infinite loop. Need to dig more, the code are way too complicated...
190  @Ignore
191  @Test
192  public void testFilterRowKey() throws IOException {
193    testFilter(new MatchLastRowKeyFilter());
194  }
196  public static final class MatchLastRowCellNextColFilter extends FilterBase {
198    @Override
199    public ReturnCode filterCell(Cell c) throws IOException {
200      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
201        return ReturnCode.INCLUDE;
202      } else {
203        return ReturnCode.NEXT_COL;
204      }
205    }
206  }
208  @Test
209  public void testFilterCellNextCol() throws IOException {
210    testFilter(new MatchLastRowCellNextColFilter());
211  }
213  public static final class MatchLastRowCellNextRowFilter extends FilterBase {
215    @Override
216    public ReturnCode filterCell(Cell c) throws IOException {
217      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
218        return ReturnCode.INCLUDE;
219      } else {
220        return ReturnCode.NEXT_ROW;
221      }
222    }
223  }
225  @Test
226  public void testFilterCellNextRow() throws IOException {
227    testFilter(new MatchLastRowCellNextRowFilter());
228  }
230  public static final class MatchLastRowFilterRowFilter extends FilterBase {
232    private boolean exclude;
234    @Override
235    public void filterRowCells(List<Cell> kvs) throws IOException {
236      Cell c = kvs.get(0);
237      exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999;
238    }
240    @Override
241    public void reset() throws IOException {
242      exclude = false;
243    }
245    @Override
246    public boolean filterRow() throws IOException {
247      return exclude;
248    }
250    @Override
251    public boolean hasFilterRow() {
252      return true;
253    }
254  }
256  @Test
257  public void testFilterRow() throws IOException {
258    testFilter(new MatchLastRowFilterRowFilter());
259  }