001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.concurrent.ThreadLocalRandom;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.client.Result;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
036import org.apache.hadoop.hbase.filter.Filter;
037import org.apache.hadoop.hbase.filter.FilterBase;
038import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
039import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
040import org.apache.hadoop.hbase.testclassification.RegionServerTests;
041import org.apache.hadoop.hbase.testclassification.SmallTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.junit.After;
044import org.junit.Before;
045import org.junit.ClassRule;
046import org.junit.Ignore;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050@Category({ RegionServerTests.class, SmallTests.class })
051public class TestSwitchToStreamRead {
052
053  @ClassRule
054  public static final HBaseClassTestRule CLASS_RULE =
055    HBaseClassTestRule.forClass(TestSwitchToStreamRead.class);
056
057  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
058
059  private static TableName TABLE_NAME = TableName.valueOf("stream");
060
061  private static byte[] FAMILY = Bytes.toBytes("cf");
062
063  private static byte[] QUAL = Bytes.toBytes("cq");
064
065  private static String VALUE_PREFIX;
066
067  private static HRegion REGION;
068
069  @Before
070  public void setUp() throws IOException {
071    UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
072    StringBuilder sb = new StringBuilder(256);
073    for (int i = 0; i < 255; i++) {
074      sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1));
075    }
076    VALUE_PREFIX = sb.append("-").toString();
077    REGION = UTIL.createLocalHRegion(
078      TableDescriptorBuilder.newBuilder(TABLE_NAME)
079        .setColumnFamily(
080          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
081        .build(),
082      null, null);
083    for (int i = 0; i < 900; i++) {
084      REGION
085        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
086    }
087    REGION.flush(true);
088    for (int i = 900; i < 1000; i++) {
089      REGION
090        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
091    }
092  }
093
094  @After
095  public void tearDown() throws IOException {
096    REGION.close(true);
097    UTIL.cleanupTestDir();
098  }
099
100  @Test
101  public void test() throws IOException {
102    try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
103      StoreScanner storeScanner =
104        (StoreScanner) scanner.storeHeap.getCurrentForTesting();
105      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
106        if (kvs instanceof StoreFileScanner) {
107          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
108          // starting from pread so we use shared reader here.
109          assertTrue(sfScanner.getReader().getReaderContext()
110            .getReaderType() == ReaderType.PREAD);
111        }
112      }
113      List<Cell> cells = new ArrayList<>();
114      for (int i = 0; i < 500; i++) {
115        assertTrue(scanner.next(cells));
116        Result result = Result.create(cells);
117        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
118        cells.clear();
119        scanner.shipped();
120      }
121      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
122        if (kvs instanceof StoreFileScanner) {
123          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
124          // we should have convert to use stream read now.
125          assertFalse(sfScanner.getReader().getReaderContext()
126            .getReaderType() == ReaderType.PREAD);
127        }
128      }
129      for (int i = 500; i < 1000; i++) {
130        assertEquals(i != 999, scanner.next(cells));
131        Result result = Result.create(cells);
132        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
133        cells.clear();
134        scanner.shipped();
135      }
136    }
137    // make sure all scanners are closed.
138    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
139      assertFalse(sf.isReferencedInReads());
140    }
141  }
142
143  public static final class MatchLastRowKeyFilter extends FilterBase {
144
145    @Override
146    public boolean filterRowKey(Cell cell) throws IOException {
147      return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999;
148    }
149  }
150
151  private void testFilter(Filter filter) throws IOException {
152    try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) {
153      StoreScanner storeScanner = (StoreScanner) scanner.storeHeap.getCurrentForTesting();
154      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
155        if (kvs instanceof StoreFileScanner) {
156          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
157          // starting from pread so we use shared reader here.
158          assertTrue(sfScanner.getReader().getReaderContext()
159            .getReaderType() == ReaderType.PREAD);
160        }
161      }
162      List<Cell> cells = new ArrayList<>();
163      // should return before finishing the scan as we want to switch from pread to stream
164      assertTrue(scanner.next(cells,
165        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
166      assertTrue(cells.isEmpty());
167      scanner.shipped();
168
169      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
170        if (kvs instanceof StoreFileScanner) {
171          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
172          // we should have convert to use stream read now.
173          assertFalse(sfScanner.getReader().getReaderContext()
174            .getReaderType() == ReaderType.PREAD);
175        }
176      }
177      assertFalse(scanner.next(cells,
178        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
179      Result result = Result.create(cells);
180      assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL)));
181      cells.clear();
182      scanner.shipped();
183    }
184    // make sure all scanners are closed.
185    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
186      assertFalse(sf.isReferencedInReads());
187    }
188  }
189
190  // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next
191  // until the row key is changed. And there we can only use NoLimitScannerContext so we can not
192  // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to
193  // an infinite loop. Need to dig more, the code are way too complicated...
194  @Ignore
195  @Test
196  public void testFilterRowKey() throws IOException {
197    testFilter(new MatchLastRowKeyFilter());
198  }
199
200  public static final class MatchLastRowCellNextColFilter extends FilterBase {
201
202    @Override
203    public ReturnCode filterCell(Cell c) throws IOException {
204      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
205        return ReturnCode.INCLUDE;
206      } else {
207        return ReturnCode.NEXT_COL;
208      }
209    }
210  }
211
212  @Test
213  public void testFilterCellNextCol() throws IOException {
214    testFilter(new MatchLastRowCellNextColFilter());
215  }
216
217  public static final class MatchLastRowCellNextRowFilter extends FilterBase {
218
219    @Override
220    public ReturnCode filterCell(Cell c) throws IOException {
221      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
222        return ReturnCode.INCLUDE;
223      } else {
224        return ReturnCode.NEXT_ROW;
225      }
226    }
227  }
228
229  @Test
230  public void testFilterCellNextRow() throws IOException {
231    testFilter(new MatchLastRowCellNextRowFilter());
232  }
233
234  public static final class MatchLastRowFilterRowFilter extends FilterBase {
235
236    private boolean exclude;
237
238    @Override
239    public void filterRowCells(List<Cell> kvs) throws IOException {
240      Cell c = kvs.get(0);
241      exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999;
242    }
243
244    @Override
245    public void reset() throws IOException {
246      exclude = false;
247    }
248
249    @Override
250    public boolean filterRow() throws IOException {
251      return exclude;
252    }
253
254    @Override
255    public boolean hasFilterRow() {
256      return true;
257    }
258  }
259
260  @Test
261  public void testFilterRow() throws IOException {
262    testFilter(new MatchLastRowFilterRowFilter());
263  }
264}