001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.Collections;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.stream.Collectors;
032
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.Scan.ReadType;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.testclassification.RegionServerTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.junit.After;
048import org.junit.Assert;
049import org.junit.Before;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053
054@Category({ RegionServerTests.class, MediumTests.class })
055public class TestSwitchToStreamRead {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059      HBaseClassTestRule.forClass(TestSwitchToStreamRead.class);
060
061  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
062
063  private static TableName TABLE_NAME = TableName.valueOf("stream");
064
065  private static byte[] FAMILY = Bytes.toBytes("cf");
066
067  private static byte[] QUAL = Bytes.toBytes("cq");
068
069  private static String VALUE_PREFIX;
070
071  private static HRegion REGION;
072
073  @Before
074  public void setUp() throws IOException {
075    UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
076    StringBuilder sb = new StringBuilder(256);
077    for (int i = 0; i < 255; i++) {
078      sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1));
079    }
080    VALUE_PREFIX = sb.append("-").toString();
081    REGION = UTIL.createLocalHRegion(
082      TableDescriptorBuilder.newBuilder(TABLE_NAME)
083          .setColumnFamily(
084            ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
085          .build(),
086      null, null);
087    for (int i = 0; i < 900; i++) {
088      REGION
089          .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
090    }
091    REGION.flush(true);
092    for (int i = 900; i < 1000; i++) {
093      REGION
094          .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
095    }
096  }
097
098  @After
099  public void tearDown() throws IOException {
100    REGION.close(true);
101    UTIL.cleanupTestDir();
102  }
103
104  private Set<StoreFileReader> getStreamReaders() {
105    List<HStore> stores = REGION.getStores();
106    Assert.assertEquals(1, stores.size());
107    HStore firstStore = stores.get(0);
108    Assert.assertNotNull(firstStore);
109    Collection<HStoreFile> storeFiles = firstStore.getStorefiles();
110    Assert.assertEquals(1, storeFiles.size());
111    HStoreFile firstSToreFile = storeFiles.iterator().next();
112    Assert.assertNotNull(firstSToreFile);
113    return Collections.unmodifiableSet(firstSToreFile.streamReaders);
114  }
115
116  /**
117   * Test Case for HBASE-21551
118   */
119  @Test
120  public void testStreamReadersCleanup() throws IOException {
121    Set<StoreFileReader> streamReaders = getStreamReaders();
122    Assert.assertEquals(0, getStreamReaders().size());
123    try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM))) {
124      StoreScanner storeScanner =
125          (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
126      List<StoreFileScanner> sfScanners = storeScanner.getAllScannersForTesting().stream()
127          .filter(kvs -> kvs instanceof StoreFileScanner).map(kvs -> (StoreFileScanner) kvs)
128          .collect(Collectors.toList());
129      Assert.assertEquals(1, sfScanners.size());
130      StoreFileScanner sfScanner = sfScanners.get(0);
131      Assert.assertFalse(sfScanner.getReader().shared);
132
133      // There should be a stream reader
134      Assert.assertEquals(1, getStreamReaders().size());
135    }
136    Assert.assertEquals(0, getStreamReaders().size());
137
138    // The streamsReader should be clear after region close even if there're some opened stream
139    // scanner.
140    RegionScannerImpl scanner = REGION.getScanner(new Scan().setReadType(ReadType.STREAM));
141    Assert.assertNotNull(scanner);
142    Assert.assertEquals(1, getStreamReaders().size());
143    REGION.close();
144    Assert.assertEquals(0, streamReaders.size());
145  }
146
147  @Test
148  public void test() throws IOException {
149    try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
150      StoreScanner storeScanner = (StoreScanner) (scanner)
151          .getStoreHeapForTesting().getCurrentForTesting();
152      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
153        if (kvs instanceof StoreFileScanner) {
154          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
155          // starting from pread so we use shared reader here.
156          assertTrue(sfScanner.getReader().shared);
157        }
158      }
159      List<Cell> cells = new ArrayList<>();
160      for (int i = 0; i < 500; i++) {
161        assertTrue(scanner.next(cells));
162        Result result = Result.create(cells);
163        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
164        cells.clear();
165        scanner.shipped();
166      }
167      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
168        if (kvs instanceof StoreFileScanner) {
169          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
170          // we should have convert to use stream read now.
171          assertFalse(sfScanner.getReader().shared);
172        }
173      }
174      for (int i = 500; i < 1000; i++) {
175        assertEquals(i != 999, scanner.next(cells));
176        Result result = Result.create(cells);
177        assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
178        cells.clear();
179        scanner.shipped();
180      }
181    }
182    // make sure all scanners are closed.
183    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
184      assertFalse(sf.isReferencedInReads());
185    }
186  }
187}