001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.concurrent.ThreadLocalRandom;
028
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
034import org.apache.hadoop.hbase.client.Put;
035import org.apache.hadoop.hbase.client.Result;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
038import org.apache.hadoop.hbase.filter.Filter;
039import org.apache.hadoop.hbase.filter.FilterBase;
040import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
041import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
042import org.apache.hadoop.hbase.testclassification.MediumTests;
043import org.apache.hadoop.hbase.testclassification.RegionServerTests;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.junit.After;
046import org.junit.Before;
047import org.junit.ClassRule;
048import org.junit.Ignore;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051
052@Category({ RegionServerTests.class, MediumTests.class })
053public class TestSwitchToStreamRead {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestSwitchToStreamRead.class);
058
059  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
060
061  private static TableName TABLE_NAME = TableName.valueOf("stream");
062
063  private static byte[] FAMILY = Bytes.toBytes("cf");
064
065  private static byte[] QUAL = Bytes.toBytes("cq");
066
067  private static String VALUE_PREFIX;
068
069  private static HRegion REGION;
070
071  @Before
072  public void setUp() throws IOException {
073    UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
074    StringBuilder sb = new StringBuilder(256);
075    for (int i = 0; i < 255; i++) {
076      sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1));
077    }
078    VALUE_PREFIX = sb.append("-").toString();
079    REGION = UTIL.createLocalHRegion(
080      TableDescriptorBuilder.newBuilder(TABLE_NAME)
081        .setColumnFamily(
082          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
083        .build(),
084      null, null);
085    for (int i = 0; i < 900; i++) {
086      REGION
087        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
088    }
089    REGION.flush(true);
090    for (int i = 900; i < 1000; i++) {
091      REGION
092        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
093    }
094  }
095
096  @After
097  public void tearDown() throws IOException {
098    REGION.close(true);
099    UTIL.cleanupTestDir();
100  }
101
102  @Test
103  public void test() throws IOException {
104    try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
105      StoreScanner storeScanner =
106        (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
107      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
108        if (kvs instanceof StoreFileScanner) {
109          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
110          // starting from pread so we use shared reader here.
111          assertTrue(sfScanner.getReader().shared);
112        }
113      }
114      List<Cell> cells = new ArrayList<>();
115      for (int i = 0; i < 500; i++) {
116        assertTrue(scanner.next(cells));
117        Result result = Result.create(cells);
118        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
119        cells.clear();
120        scanner.shipped();
121      }
122      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
123        if (kvs instanceof StoreFileScanner) {
124          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
125          // we should have convert to use stream read now.
126          assertFalse(sfScanner.getReader().shared);
127        }
128      }
129      for (int i = 500; i < 1000; i++) {
130        assertEquals(i != 999, scanner.next(cells));
131        Result result = Result.create(cells);
132        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
133        cells.clear();
134        scanner.shipped();
135      }
136    }
137    // make sure all scanners are closed.
138    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
139      assertFalse(sf.isReferencedInReads());
140    }
141  }
142
143  public static final class MatchLastRowKeyFilter extends FilterBase {
144
145    @Override
146    public boolean filterRowKey(Cell cell) throws IOException {
147      return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999;
148    }
149  }
150
151  private void testFilter(Filter filter) throws IOException {
152    try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) {
153      StoreScanner storeScanner =
154        (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
155      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
156        if (kvs instanceof StoreFileScanner) {
157          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
158          // starting from pread so we use shared reader here.
159          assertTrue(sfScanner.getReader().shared);
160        }
161      }
162      List<Cell> cells = new ArrayList<>();
163      // should return before finishing the scan as we want to switch from pread to stream
164      assertTrue(scanner.next(cells,
165        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
166      assertTrue(cells.isEmpty());
167      scanner.shipped();
168
169      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
170        if (kvs instanceof StoreFileScanner) {
171          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
172          // we should have convert to use stream read now.
173          assertFalse(sfScanner.getReader().shared);
174        }
175      }
176      assertFalse(scanner.next(cells,
177        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
178      Result result = Result.create(cells);
179      assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL)));
180      cells.clear();
181      scanner.shipped();
182    }
183    // make sure all scanners are closed.
184    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
185      assertFalse(sf.isReferencedInReads());
186    }
187  }
188
189  // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next
190  // until the row key is changed. And there we can only use NoLimitScannerContext so we can not
191  // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to
192  // an infinite loop. Need to dig more, the code are way too complicated...
193  @Ignore
194  @Test
195  public void testFilterRowKey() throws IOException {
196    testFilter(new MatchLastRowKeyFilter());
197  }
198
199  public static final class MatchLastRowCellNextColFilter extends FilterBase {
200
201    @Override
202    public ReturnCode filterCell(Cell c) throws IOException {
203      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
204        return ReturnCode.INCLUDE;
205      } else {
206        return ReturnCode.NEXT_COL;
207      }
208    }
209  }
210
211  @Test
212  public void testFilterCellNextCol() throws IOException {
213    testFilter(new MatchLastRowCellNextColFilter());
214  }
215
216  public static final class MatchLastRowCellNextRowFilter extends FilterBase {
217
218    @Override
219    public ReturnCode filterCell(Cell c) throws IOException {
220      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
221        return ReturnCode.INCLUDE;
222      } else {
223        return ReturnCode.NEXT_ROW;
224      }
225    }
226  }
227
228  @Test
229  public void testFilterCellNextRow() throws IOException {
230    testFilter(new MatchLastRowCellNextRowFilter());
231  }
232
233  public static final class MatchLastRowFilterRowFilter extends FilterBase {
234
235    private boolean exclude;
236
237    @Override
238    public void filterRowCells(List<Cell> kvs) throws IOException {
239      Cell c = kvs.get(0);
240      exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999;
241    }
242
243    @Override
244    public void reset() throws IOException {
245      exclude = false;
246    }
247
248    @Override
249    public boolean filterRow() throws IOException {
250      return exclude;
251    }
252
253    @Override
254    public boolean hasFilterRow() {
255      return true;
256    }
257  }
258
259  @Test
260  public void testFilterRow() throws IOException {
261    testFilter(new MatchLastRowFilterRowFilter());
262  }
263}