001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.io.UncheckedIOException;
025import java.util.Arrays;
026import java.util.List;
027import java.util.concurrent.ForkJoinPool;
028import java.util.function.Supplier;
029import java.util.stream.Collectors;
030import java.util.stream.IntStream;
031
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.Pair;
037import org.junit.AfterClass;
038import org.junit.BeforeClass;
039import org.junit.Test;
040
041public abstract class AbstractTestAsyncTableScan {
042
043  protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
044
045  protected static TableName TABLE_NAME = TableName.valueOf("async");
046
047  protected static byte[] FAMILY = Bytes.toBytes("cf");
048
049  protected static byte[] CQ1 = Bytes.toBytes("cq1");
050
051  protected static byte[] CQ2 = Bytes.toBytes("cq2");
052
053  protected static int COUNT = 1000;
054
055  protected static AsyncConnection ASYNC_CONN;
056
057  @BeforeClass
058  public static void setUp() throws Exception {
059    TEST_UTIL.startMiniCluster(3);
060    byte[][] splitKeys = new byte[8][];
061    for (int i = 111; i < 999; i += 111) {
062      splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
063    }
064    TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
065    TEST_UTIL.waitTableAvailable(TABLE_NAME);
066    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
067    ASYNC_CONN.getTable(TABLE_NAME).putAll(IntStream.range(0, COUNT)
068        .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
069            .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
070        .collect(Collectors.toList())).get();
071  }
072
073  @AfterClass
074  public static void tearDown() throws Exception {
075    ASYNC_CONN.close();
076    TEST_UTIL.shutdownMiniCluster();
077  }
078
079  protected static Scan createNormalScan() {
080    return new Scan();
081  }
082
083  protected static Scan createBatchScan() {
084    return new Scan().setBatch(1);
085  }
086
087  // set a small result size for testing flow control
088  protected static Scan createSmallResultSizeScan() {
089    return new Scan().setMaxResultSize(1);
090  }
091
092  protected static Scan createBatchSmallResultSizeScan() {
093    return new Scan().setBatch(1).setMaxResultSize(1);
094  }
095
096  protected static AsyncTable<?> getRawTable() {
097    return ASYNC_CONN.getTable(TABLE_NAME);
098  }
099
100  protected static AsyncTable<?> getTable() {
101    return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
102  }
103
104  private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
105    return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan),
106      Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan),
107      Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan),
108      Pair.newPair("batchSmallResultSize",
109        AbstractTestAsyncTableScan::createBatchSmallResultSizeScan));
110  }
111
112  protected static List<Object[]> getScanCreatorParams() {
113    return getScanCreator().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() })
114        .collect(Collectors.toList());
115  }
116
117  private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() {
118    return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable),
119      Pair.newPair("normal", AbstractTestAsyncTableScan::getTable));
120  }
121
122  protected static List<Object[]> getTableAndScanCreatorParams() {
123    List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator();
124    List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator();
125    return tableCreator.stream()
126        .flatMap(tp -> scanCreator.stream().map(
127          sp -> new Object[] { tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond() }))
128        .collect(Collectors.toList());
129  }
130
131  protected abstract Scan createScan();
132
133  protected abstract List<Result> doScan(Scan scan) throws Exception;
134
135  protected final List<Result> convertFromBatchResult(List<Result> results) {
136    assertTrue(results.size() % 2 == 0);
137    return IntStream.range(0, results.size() / 2).mapToObj(i -> {
138      try {
139        return Result
140            .createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1)));
141      } catch (IOException e) {
142        throw new UncheckedIOException(e);
143      }
144    }).collect(Collectors.toList());
145  }
146
147  @Test
148  public void testScanAll() throws Exception {
149    List<Result> results = doScan(createScan());
150    // make sure all scanners are closed at RS side
151    TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer())
152        .forEach(
153          rs -> assertEquals(
154            "The scanner count of " + rs.getServerName() + " is " +
155              rs.getRSRpcServices().getScannersCount(),
156            0, rs.getRSRpcServices().getScannersCount()));
157    assertEquals(COUNT, results.size());
158    IntStream.range(0, COUNT).forEach(i -> {
159      Result result = results.get(i);
160      assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
161      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
162    });
163  }
164
165  private void assertResultEquals(Result result, int i) {
166    assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
167    assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
168    assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2)));
169  }
170
171  @Test
172  public void testReversedScanAll() throws Exception {
173    List<Result> results = doScan(createScan().setReversed(true));
174    assertEquals(COUNT, results.size());
175    IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
176  }
177
178  @Test
179  public void testScanNoStopKey() throws Exception {
180    int start = 345;
181    List<Result> results =
182      doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))));
183    assertEquals(COUNT - start, results.size());
184    IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
185  }
186
187  @Test
188  public void testReverseScanNoStopKey() throws Exception {
189    int start = 765;
190    List<Result> results = doScan(
191      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true));
192    assertEquals(start + 1, results.size());
193    IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
194  }
195
196  @Test
197  public void testScanWrongColumnFamily() throws Exception {
198    try {
199      doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")));
200    } catch (Exception e) {
201      assertTrue(e instanceof NoSuchColumnFamilyException ||
202        e.getCause() instanceof NoSuchColumnFamilyException);
203    }
204  }
205
206  private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
207      int limit) throws Exception {
208    Scan scan =
209      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
210          .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
211    if (limit > 0) {
212      scan.setLimit(limit);
213    }
214    List<Result> results = doScan(scan);
215    int actualStart = startInclusive ? start : start + 1;
216    int actualStop = stopInclusive ? stop + 1 : stop;
217    int count = actualStop - actualStart;
218    if (limit > 0) {
219      count = Math.min(count, limit);
220    }
221    assertEquals(count, results.size());
222    IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i));
223  }
224
225  private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
226      int limit) throws Exception {
227    Scan scan =
228      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
229          .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true);
230    if (limit > 0) {
231      scan.setLimit(limit);
232    }
233    List<Result> results = doScan(scan);
234    int actualStart = startInclusive ? start : start - 1;
235    int actualStop = stopInclusive ? stop - 1 : stop;
236    int count = actualStart - actualStop;
237    if (limit > 0) {
238      count = Math.min(count, limit);
239    }
240    assertEquals(count, results.size());
241    IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i));
242  }
243
244  @Test
245  public void testScanWithStartKeyAndStopKey() throws Exception {
246    testScan(1, true, 998, false, -1); // from first region to last region
247    testScan(123, true, 345, true, -1);
248    testScan(234, true, 456, false, -1);
249    testScan(345, false, 567, true, -1);
250    testScan(456, false, 678, false, -1);
251  }
252
253  @Test
254  public void testReversedScanWithStartKeyAndStopKey() throws Exception {
255    testReversedScan(998, true, 1, false, -1); // from last region to first region
256    testReversedScan(543, true, 321, true, -1);
257    testReversedScan(654, true, 432, false, -1);
258    testReversedScan(765, false, 543, true, -1);
259    testReversedScan(876, false, 654, false, -1);
260  }
261
262  @Test
263  public void testScanAtRegionBoundary() throws Exception {
264    testScan(222, true, 333, true, -1);
265    testScan(333, true, 444, false, -1);
266    testScan(444, false, 555, true, -1);
267    testScan(555, false, 666, false, -1);
268  }
269
270  @Test
271  public void testReversedScanAtRegionBoundary() throws Exception {
272    testReversedScan(333, true, 222, true, -1);
273    testReversedScan(444, true, 333, false, -1);
274    testReversedScan(555, false, 444, true, -1);
275    testReversedScan(666, false, 555, false, -1);
276  }
277
278  @Test
279  public void testScanWithLimit() throws Exception {
280    testScan(1, true, 998, false, 900); // from first region to last region
281    testScan(123, true, 234, true, 100);
282    testScan(234, true, 456, false, 100);
283    testScan(345, false, 567, true, 100);
284    testScan(456, false, 678, false, 100);
285  }
286
287  @Test
288  public void testScanWithLimitGreaterThanActualCount() throws Exception {
289    testScan(1, true, 998, false, 1000); // from first region to last region
290    testScan(123, true, 345, true, 200);
291    testScan(234, true, 456, false, 200);
292    testScan(345, false, 567, true, 200);
293    testScan(456, false, 678, false, 200);
294  }
295
296  @Test
297  public void testReversedScanWithLimit() throws Exception {
298    testReversedScan(998, true, 1, false, 900); // from last region to first region
299    testReversedScan(543, true, 321, true, 100);
300    testReversedScan(654, true, 432, false, 100);
301    testReversedScan(765, false, 543, true, 100);
302    testReversedScan(876, false, 654, false, 100);
303  }
304
305  @Test
306  public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
307    testReversedScan(998, true, 1, false, 1000); // from last region to first region
308    testReversedScan(543, true, 321, true, 200);
309    testReversedScan(654, true, 432, false, 200);
310    testReversedScan(765, false, 543, true, 200);
311    testReversedScan(876, false, 654, false, 200);
312  }
313}