001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.stream.Collectors;
032
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.Scan.ReadType;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.filter.Filter;
044import org.apache.hadoop.hbase.filter.FilterBase;
045import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
046import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.RegionServerTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.junit.After;
051import org.junit.Assert;
052import org.junit.Before;
053import org.junit.ClassRule;
054import org.junit.Ignore;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057
058@Category({ RegionServerTests.class, MediumTests.class })
059public class TestSwitchToStreamRead {
060
061  @ClassRule
062  public static final HBaseClassTestRule CLASS_RULE =
063    HBaseClassTestRule.forClass(TestSwitchToStreamRead.class);
064
065  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
066
067  private static TableName TABLE_NAME = TableName.valueOf("stream");
068
069  private static byte[] FAMILY = Bytes.toBytes("cf");
070
071  private static byte[] QUAL = Bytes.toBytes("cq");
072
073  private static String VALUE_PREFIX;
074
075  private static HRegion REGION;
076
077  @Before
078  public void setUp() throws IOException {
079    UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
080    StringBuilder sb = new StringBuilder(256);
081    for (int i = 0; i < 255; i++) {
082      sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1));
083    }
084    VALUE_PREFIX = sb.append("-").toString();
085    REGION = UTIL.createLocalHRegion(
086      TableDescriptorBuilder.newBuilder(TABLE_NAME)
087        .setColumnFamily(
088          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
089        .build(),
090      null, null);
091    for (int i = 0; i < 900; i++) {
092      REGION
093        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
094    }
095    REGION.flush(true);
096    for (int i = 900; i < 1000; i++) {
097      REGION
098        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
099    }
100  }
101
102  @After
103  public void tearDown() throws IOException {
104    REGION.close(true);
105    UTIL.cleanupTestDir();
106  }
107
108  private Set<StoreFileReader> getStreamReaders() {
109    List<HStore> stores = REGION.getStores();
110    Assert.assertEquals(1, stores.size());
111    HStore firstStore = stores.get(0);
112    Assert.assertNotNull(firstStore);
113    Collection<HStoreFile> storeFiles = firstStore.getStorefiles();
114    Assert.assertEquals(1, storeFiles.size());
115    HStoreFile firstSToreFile = storeFiles.iterator().next();
116    Assert.assertNotNull(firstSToreFile);
117    return Collections.unmodifiableSet(firstSToreFile.streamReaders);
118  }
119
120  /**
121   * Test Case for HBASE-21551
122   */
123  @Test
124  public void testStreamReadersCleanup() throws IOException {
125    Set<StoreFileReader> streamReaders = getStreamReaders();
126    Assert.assertEquals(0, getStreamReaders().size());
127    try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM))) {
128      StoreScanner storeScanner =
129          (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
130      List<StoreFileScanner> sfScanners = storeScanner.getAllScannersForTesting().stream()
131          .filter(kvs -> kvs instanceof StoreFileScanner).map(kvs -> (StoreFileScanner) kvs)
132          .collect(Collectors.toList());
133      Assert.assertEquals(1, sfScanners.size());
134      StoreFileScanner sfScanner = sfScanners.get(0);
135      Assert.assertFalse(sfScanner.getReader().shared);
136
137      // There should be a stream reader
138      Assert.assertEquals(1, getStreamReaders().size());
139    }
140    Assert.assertEquals(0, getStreamReaders().size());
141
142    // The streamsReader should be clear after region close even if there're some opened stream
143    // scanner.
144    RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM));
145    Assert.assertNotNull(scanner);
146    Assert.assertEquals(1, getStreamReaders().size());
147    REGION.close();
148    Assert.assertEquals(0, streamReaders.size());
149  }
150
151  @Test
152  public void test() throws IOException {
153    try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
154      StoreScanner storeScanner =
155        (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
156      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
157        if (kvs instanceof StoreFileScanner) {
158          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
159          // starting from pread so we use shared reader here.
160          assertTrue(sfScanner.getReader().shared);
161        }
162      }
163      List<Cell> cells = new ArrayList<>();
164      for (int i = 0; i < 500; i++) {
165        assertTrue(scanner.next(cells));
166        Result result = Result.create(cells);
167        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
168        cells.clear();
169        scanner.shipped();
170      }
171      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
172        if (kvs instanceof StoreFileScanner) {
173          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
174          // we should have convert to use stream read now.
175          assertFalse(sfScanner.getReader().shared);
176        }
177      }
178      for (int i = 500; i < 1000; i++) {
179        assertEquals(i != 999, scanner.next(cells));
180        Result result = Result.create(cells);
181        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
182        cells.clear();
183        scanner.shipped();
184      }
185    }
186    // make sure all scanners are closed.
187    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
188      assertFalse(sf.isReferencedInReads());
189    }
190  }
191
192  public static final class MatchLastRowKeyFilter extends FilterBase {
193
194    @Override
195    public boolean filterRowKey(Cell cell) throws IOException {
196      return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999;
197    }
198  }
199
200  private void testFilter(Filter filter) throws IOException {
201    try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) {
202      StoreScanner storeScanner =
203        (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
204      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
205        if (kvs instanceof StoreFileScanner) {
206          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
207          // starting from pread so we use shared reader here.
208          assertTrue(sfScanner.getReader().shared);
209        }
210      }
211      List<Cell> cells = new ArrayList<>();
212      // should return before finishing the scan as we want to switch from pread to stream
213      assertTrue(scanner.next(cells,
214        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
215      assertTrue(cells.isEmpty());
216      scanner.shipped();
217
218      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
219        if (kvs instanceof StoreFileScanner) {
220          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
221          // we should have convert to use stream read now.
222          assertFalse(sfScanner.getReader().shared);
223        }
224      }
225      assertFalse(scanner.next(cells,
226        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
227      Result result = Result.create(cells);
228      assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL)));
229      cells.clear();
230      scanner.shipped();
231    }
232    // make sure all scanners are closed.
233    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
234      assertFalse(sf.isReferencedInReads());
235    }
236  }
237
238  // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next
239  // until the row key is changed. And there we can only use NoLimitScannerContext so we can not
240  // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to
241  // an infinite loop. Need to dig more, the code are way too complicated...
242  @Ignore
243  @Test
244  public void testFilterRowKey() throws IOException {
245    testFilter(new MatchLastRowKeyFilter());
246  }
247
248  public static final class MatchLastRowCellNextColFilter extends FilterBase {
249
250    @Override
251    public ReturnCode filterCell(Cell c) throws IOException {
252      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
253        return ReturnCode.INCLUDE;
254      } else {
255        return ReturnCode.NEXT_COL;
256      }
257    }
258  }
259
260  @Test
261  public void testFilterCellNextCol() throws IOException {
262    testFilter(new MatchLastRowCellNextColFilter());
263  }
264
265  public static final class MatchLastRowCellNextRowFilter extends FilterBase {
266
267    @Override
268    public ReturnCode filterCell(Cell c) throws IOException {
269      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
270        return ReturnCode.INCLUDE;
271      } else {
272        return ReturnCode.NEXT_ROW;
273      }
274    }
275  }
276
277  @Test
278  public void testFilterCellNextRow() throws IOException {
279    testFilter(new MatchLastRowCellNextRowFilter());
280  }
281
282  public static final class MatchLastRowFilterRowFilter extends FilterBase {
283
284    private boolean exclude;
285
286    @Override
287    public void filterRowCells(List<Cell> kvs) throws IOException {
288      Cell c = kvs.get(0);
289      exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999;
290    }
291
292    @Override
293    public void reset() throws IOException {
294      exclude = false;
295    }
296
297    @Override
298    public boolean filterRow() throws IOException {
299      return exclude;
300    }
301
302    @Override
303    public boolean hasFilterRow() {
304      return true;
305    }
306  }
307
308  @Test
309  public void testFilterRow() throws IOException {
310    testFilter(new MatchLastRowFilterRowFilter());
311  }
312}