001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.UncheckedIOException; 025import java.util.Arrays; 026import java.util.List; 027import java.util.concurrent.ForkJoinPool; 028import java.util.function.Supplier; 029import java.util.stream.Collectors; 030import java.util.stream.IntStream; 031 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.Pair; 037import org.junit.AfterClass; 038import org.junit.BeforeClass; 039import org.junit.Test; 040 041public abstract class AbstractTestAsyncTableScan { 042 043 protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 044 045 protected static TableName TABLE_NAME = TableName.valueOf("async"); 046 047 protected static byte[] FAMILY = Bytes.toBytes("cf"); 048 049 protected static byte[] CQ1 = Bytes.toBytes("cq1"); 050 051 protected static byte[] CQ2 = Bytes.toBytes("cq2"); 052 053 protected static int COUNT = 1000; 054 055 protected static AsyncConnection ASYNC_CONN; 056 057 @BeforeClass 058 public static void setUp() throws Exception { 059 TEST_UTIL.startMiniCluster(3); 060 byte[][] splitKeys = new byte[8][]; 061 for (int i = 111; i < 999; i += 111) { 062 splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 063 } 064 TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); 065 TEST_UTIL.waitTableAvailable(TABLE_NAME); 066 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 067 ASYNC_CONN.getTable(TABLE_NAME).putAll(IntStream.range(0, COUNT) 068 .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) 069 .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) 070 .collect(Collectors.toList())).get(); 071 } 072 073 @AfterClass 074 public static void tearDown() throws Exception { 075 ASYNC_CONN.close(); 076 TEST_UTIL.shutdownMiniCluster(); 077 } 078 079 protected static Scan createNormalScan() { 080 return new Scan(); 081 } 082 083 protected static Scan createBatchScan() { 084 return new Scan().setBatch(1); 085 } 086 087 // set a small result size for testing flow control 088 protected static Scan createSmallResultSizeScan() { 089 return new Scan().setMaxResultSize(1); 090 } 091 092 protected static Scan createBatchSmallResultSizeScan() { 093 return new Scan().setBatch(1).setMaxResultSize(1); 094 } 095 096 protected static AsyncTable<?> getRawTable() { 097 return ASYNC_CONN.getTable(TABLE_NAME); 098 } 099 100 protected static AsyncTable<?> getTable() { 101 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 102 } 103 104 private static List<Pair<String, Supplier<Scan>>> getScanCreator() { 105 return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan), 106 Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan), 107 Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan), 108 Pair.newPair("batchSmallResultSize", 109 AbstractTestAsyncTableScan::createBatchSmallResultSizeScan)); 110 } 111 112 protected static List<Object[]> getScanCreatorParams() { 113 return getScanCreator().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() }) 114 .collect(Collectors.toList()); 115 } 116 117 private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() { 118 return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable), 119 Pair.newPair("normal", AbstractTestAsyncTableScan::getTable)); 120 } 121 122 protected static List<Object[]> getTableAndScanCreatorParams() { 123 List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator(); 124 List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator(); 125 return tableCreator.stream() 126 .flatMap(tp -> scanCreator.stream().map( 127 sp -> new Object[] { tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond() })) 128 .collect(Collectors.toList()); 129 } 130 131 protected abstract Scan createScan(); 132 133 protected abstract List<Result> doScan(Scan scan) throws Exception; 134 135 protected final List<Result> convertFromBatchResult(List<Result> results) { 136 assertTrue(results.size() % 2 == 0); 137 return IntStream.range(0, results.size() / 2).mapToObj(i -> { 138 try { 139 return Result 140 .createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1))); 141 } catch (IOException e) { 142 throw new UncheckedIOException(e); 143 } 144 }).collect(Collectors.toList()); 145 } 146 147 @Test 148 public void testScanAll() throws Exception { 149 List<Result> results = doScan(createScan()); 150 // make sure all scanners are closed at RS side 151 TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) 152 .forEach( 153 rs -> assertEquals( 154 "The scanner count of " + rs.getServerName() + " is " + 155 rs.getRSRpcServices().getScannersCount(), 156 0, rs.getRSRpcServices().getScannersCount())); 157 assertEquals(COUNT, results.size()); 158 IntStream.range(0, COUNT).forEach(i -> { 159 Result result = results.get(i); 160 assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); 161 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); 162 }); 163 } 164 165 private void assertResultEquals(Result result, int i) { 166 assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); 167 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); 168 assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2))); 169 } 170 171 @Test 172 public void testReversedScanAll() throws Exception { 173 List<Result> results = doScan(createScan().setReversed(true)); 174 assertEquals(COUNT, results.size()); 175 IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); 176 } 177 178 @Test 179 public void testScanNoStopKey() throws Exception { 180 int start = 345; 181 List<Result> results = 182 doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)))); 183 assertEquals(COUNT - start, results.size()); 184 IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); 185 } 186 187 @Test 188 public void testReverseScanNoStopKey() throws Exception { 189 int start = 765; 190 List<Result> results = doScan( 191 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true)); 192 assertEquals(start + 1, results.size()); 193 IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); 194 } 195 196 @Test 197 public void testScanWrongColumnFamily() throws Exception { 198 try { 199 doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily"))); 200 } catch (Exception e) { 201 assertTrue(e instanceof NoSuchColumnFamilyException || 202 e.getCause() instanceof NoSuchColumnFamilyException); 203 } 204 } 205 206 private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 207 int limit) throws Exception { 208 Scan scan = 209 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) 210 .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive); 211 if (limit > 0) { 212 scan.setLimit(limit); 213 } 214 List<Result> results = doScan(scan); 215 int actualStart = startInclusive ? start : start + 1; 216 int actualStop = stopInclusive ? stop + 1 : stop; 217 int count = actualStop - actualStart; 218 if (limit > 0) { 219 count = Math.min(count, limit); 220 } 221 assertEquals(count, results.size()); 222 IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i)); 223 } 224 225 private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 226 int limit) throws Exception { 227 Scan scan = 228 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) 229 .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true); 230 if (limit > 0) { 231 scan.setLimit(limit); 232 } 233 List<Result> results = doScan(scan); 234 int actualStart = startInclusive ? start : start - 1; 235 int actualStop = stopInclusive ? stop - 1 : stop; 236 int count = actualStart - actualStop; 237 if (limit > 0) { 238 count = Math.min(count, limit); 239 } 240 assertEquals(count, results.size()); 241 IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i)); 242 } 243 244 @Test 245 public void testScanWithStartKeyAndStopKey() throws Exception { 246 testScan(1, true, 998, false, -1); // from first region to last region 247 testScan(123, true, 345, true, -1); 248 testScan(234, true, 456, false, -1); 249 testScan(345, false, 567, true, -1); 250 testScan(456, false, 678, false, -1); 251 } 252 253 @Test 254 public void testReversedScanWithStartKeyAndStopKey() throws Exception { 255 testReversedScan(998, true, 1, false, -1); // from last region to first region 256 testReversedScan(543, true, 321, true, -1); 257 testReversedScan(654, true, 432, false, -1); 258 testReversedScan(765, false, 543, true, -1); 259 testReversedScan(876, false, 654, false, -1); 260 } 261 262 @Test 263 public void testScanAtRegionBoundary() throws Exception { 264 testScan(222, true, 333, true, -1); 265 testScan(333, true, 444, false, -1); 266 testScan(444, false, 555, true, -1); 267 testScan(555, false, 666, false, -1); 268 } 269 270 @Test 271 public void testReversedScanAtRegionBoundary() throws Exception { 272 testReversedScan(333, true, 222, true, -1); 273 testReversedScan(444, true, 333, false, -1); 274 testReversedScan(555, false, 444, true, -1); 275 testReversedScan(666, false, 555, false, -1); 276 } 277 278 @Test 279 public void testScanWithLimit() throws Exception { 280 testScan(1, true, 998, false, 900); // from first region to last region 281 testScan(123, true, 234, true, 100); 282 testScan(234, true, 456, false, 100); 283 testScan(345, false, 567, true, 100); 284 testScan(456, false, 678, false, 100); 285 } 286 287 @Test 288 public void testScanWithLimitGreaterThanActualCount() throws Exception { 289 testScan(1, true, 998, false, 1000); // from first region to last region 290 testScan(123, true, 345, true, 200); 291 testScan(234, true, 456, false, 200); 292 testScan(345, false, 567, true, 200); 293 testScan(456, false, 678, false, 200); 294 } 295 296 @Test 297 public void testReversedScanWithLimit() throws Exception { 298 testReversedScan(998, true, 1, false, 900); // from last region to first region 299 testReversedScan(543, true, 321, true, 100); 300 testReversedScan(654, true, 432, false, 100); 301 testReversedScan(765, false, 543, true, 100); 302 testReversedScan(876, false, 654, false, 100); 303 } 304 305 @Test 306 public void testReversedScanWithLimitGreaterThanActualCount() throws Exception { 307 testReversedScan(998, true, 1, false, 1000); // from last region to first region 308 testReversedScan(543, true, 321, true, 200); 309 testReversedScan(654, true, 432, false, 200); 310 testReversedScan(765, false, 543, true, 200); 311 testReversedScan(876, false, 654, false, 200); 312 } 313}