001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.allOf; 023import static org.hamcrest.Matchers.anyOf; 024import static org.hamcrest.Matchers.containsString; 025import static org.hamcrest.Matchers.endsWith; 026import static org.hamcrest.Matchers.hasItem; 027import static org.hamcrest.Matchers.hasProperty; 028import static org.hamcrest.Matchers.is; 029import static org.hamcrest.Matchers.isA; 030import static org.junit.jupiter.api.Assertions.assertEquals; 031import static org.junit.jupiter.api.Assertions.assertThrows; 032import static org.junit.jupiter.api.Assertions.fail; 033 034import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; 035import io.opentelemetry.sdk.trace.data.SpanData; 036import java.io.IOException; 037import java.io.UncheckedIOException; 038import java.util.Arrays; 039import java.util.List; 040import java.util.Objects; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ForkJoinPool; 043import java.util.concurrent.TimeUnit; 044import java.util.function.Supplier; 045import java.util.stream.Collectors; 046import java.util.stream.IntStream; 047import java.util.stream.Stream; 048import org.apache.hadoop.hbase.HBaseTestingUtil; 049import org.apache.hadoop.hbase.MatcherPredicate; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; 052import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 053import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; 054import org.apache.hadoop.hbase.trace.TraceUtil; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.util.JVMClusterUtil; 057import org.apache.hadoop.hbase.util.Pair; 058import org.hamcrest.Matcher; 059import org.junit.jupiter.api.AfterAll; 060import org.junit.jupiter.api.BeforeAll; 061import org.junit.jupiter.api.BeforeEach; 062import org.junit.jupiter.api.TestInfo; 063import org.junit.jupiter.api.TestTemplate; 064import org.junit.jupiter.api.extension.RegisterExtension; 065import org.junit.jupiter.params.provider.Arguments; 066 067import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 068 069public abstract class AbstractTestAsyncTableScan { 070 071 @RegisterExtension 072 protected static final OpenTelemetryExtension OTEL_EXT = OpenTelemetryExtension.create(); 073 074 protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 075 076 protected static AsyncConnection CONN; 077 078 protected String methodName; 079 080 @BeforeAll 081 public static void setUpBeforeClass() throws Exception { 082 UTIL.startMiniCluster(3); 083 byte[][] splitKeys = new byte[8][]; 084 for (int i = 111; i < 999; i += 111) { 085 splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 086 } 087 UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); 088 UTIL.waitTableAvailable(TABLE_NAME); 089 try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { 090 table.put(IntStream.range(0, COUNT) 091 .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) 092 .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) 093 .collect(Collectors.toList())); 094 } 095 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 096 } 097 098 @AfterAll 099 public static void tearDownAfterClass() throws Exception { 100 Closeables.close(CONN, true); 101 UTIL.shutdownMiniCluster(); 102 } 103 104 @BeforeEach 105 public void setUp(TestInfo testInfo) { 106 methodName = testInfo.getTestMethod().get().getName(); 107 } 108 109 protected static TableName TABLE_NAME = TableName.valueOf("async"); 110 111 protected static byte[] FAMILY = Bytes.toBytes("cf"); 112 113 protected static byte[] CQ1 = Bytes.toBytes("cq1"); 114 115 protected static byte[] CQ2 = Bytes.toBytes("cq2"); 116 117 protected static int COUNT = 1000; 118 119 private static Scan createNormalScan() { 120 return new Scan(); 121 } 122 123 private static Scan createBatchScan() { 124 return new Scan().setBatch(1); 125 } 126 127 // set a small result size for testing flow control 128 private static Scan createSmallResultSizeScan() { 129 return new Scan().setMaxResultSize(1); 130 } 131 132 private static Scan createBatchSmallResultSizeScan() { 133 return new Scan().setBatch(1).setMaxResultSize(1); 134 } 135 136 private static AsyncTable<?> getRawTable() { 137 return CONN.getTable(TABLE_NAME); 138 } 139 140 private static AsyncTable<?> getTable() { 141 return CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); 142 } 143 144 private static List<Pair<String, Supplier<Scan>>> getScanCreator() { 145 return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan), 146 Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan), 147 Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan), 148 Pair.newPair("batchSmallResultSize", 149 AbstractTestAsyncTableScan::createBatchSmallResultSizeScan)); 150 } 151 152 protected static Stream<Arguments> getScanCreatorParams() { 153 return getScanCreator().stream().map(p -> Arguments.of(p.getFirst(), p.getSecond())); 154 } 155 156 private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() { 157 return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable), 158 Pair.newPair("normal", AbstractTestAsyncTableScan::getTable)); 159 } 160 161 protected static Stream<Arguments> getTableAndScanCreatorParams() { 162 List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator(); 163 List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator(); 164 return tableCreator.stream().flatMap(tp -> scanCreator.stream() 165 .map(sp -> Arguments.of(tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond()))); 166 } 167 168 protected abstract Scan createScan(); 169 170 protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception; 171 172 /** 173 * Used by implementation classes to assert the correctness of spans produced under test. 174 */ 175 protected abstract void assertTraceContinuity(); 176 177 /** 178 * Used by implementation classes to assert the correctness of spans having errors. 179 */ 180 protected abstract void 181 assertTraceError(final Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher); 182 183 protected final List<Result> convertFromBatchResult(List<Result> results) { 184 assertEquals(0, results.size() % 2); 185 return IntStream.range(0, results.size() / 2).mapToObj(i -> { 186 try { 187 return Result 188 .createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1))); 189 } catch (IOException e) { 190 throw new UncheckedIOException(e); 191 } 192 }).collect(Collectors.toList()); 193 } 194 195 protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) { 196 UTIL.waitFor(TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( 197 "Span for test failed to complete.", OTEL_EXT::getSpans, hasItem(parentSpanMatcher))); 198 } 199 200 protected static Stream<SpanData> spanStream() { 201 return OTEL_EXT.getSpans().stream().filter(Objects::nonNull); 202 } 203 204 @TestTemplate 205 public void testScanAll() throws Exception { 206 List<Result> results = doScan(createScan(), -1); 207 // make sure all scanners are closed at RS side 208 UTIL.getHBaseCluster().getRegionServerThreads().stream() 209 .map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach(rs -> { 210 int scannerCount = rs.getRSRpcServices().getScannersCount(); 211 assertEquals(0, scannerCount, 212 "The scanner count of " + rs.getServerName() + " is " + scannerCount); 213 }); 214 assertEquals(COUNT, results.size()); 215 IntStream.range(0, COUNT).forEach(i -> { 216 Result result = results.get(i); 217 assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); 218 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); 219 }); 220 } 221 222 private void assertResultEquals(Result result, int i) { 223 assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); 224 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); 225 assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2))); 226 } 227 228 @TestTemplate 229 public void testReversedScanAll() throws Exception { 230 List<Result> results = 231 TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), methodName); 232 assertEquals(COUNT, results.size()); 233 IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); 234 assertTraceContinuity(); 235 } 236 237 @TestTemplate 238 public void testScanNoStopKey() throws Exception { 239 int start = 345; 240 List<Result> results = TraceUtil.trace( 241 () -> doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1), 242 methodName); 243 assertEquals(COUNT - start, results.size()); 244 IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); 245 assertTraceContinuity(); 246 } 247 248 @TestTemplate 249 public void testReverseScanNoStopKey() throws Exception { 250 int start = 765; 251 final Scan scan = 252 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true); 253 List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), methodName); 254 assertEquals(start + 1, results.size()); 255 IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); 256 assertTraceContinuity(); 257 } 258 259 @TestTemplate 260 public void testScanWrongColumnFamily() { 261 final Exception e = assertThrows(Exception.class, () -> TraceUtil.trace( 262 () -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1), methodName)); 263 // hamcrest generic enforcement for `anyOf` is a pain; skip it 264 // but -- don't we always unwrap ExecutionExceptions -- bug? 265 if (e instanceof NoSuchColumnFamilyException) { 266 final NoSuchColumnFamilyException ex = (NoSuchColumnFamilyException) e; 267 assertThat(ex, isA(NoSuchColumnFamilyException.class)); 268 } else if (e instanceof ExecutionException) { 269 final ExecutionException ex = (ExecutionException) e; 270 assertThat(ex, allOf(isA(ExecutionException.class), 271 hasProperty("cause", isA(NoSuchColumnFamilyException.class)))); 272 } else { 273 fail("Found unexpected Exception " + e); 274 } 275 assertTraceError(anyOf( 276 containsEntry(is(HBaseSemanticAttributes.EXCEPTION_TYPE), 277 endsWith(NoSuchColumnFamilyException.class.getName())), 278 allOf( 279 containsEntry(is(HBaseSemanticAttributes.EXCEPTION_TYPE), 280 endsWith(RemoteWithExtrasException.class.getName())), 281 containsEntry(is(HBaseSemanticAttributes.EXCEPTION_MESSAGE), 282 containsString(NoSuchColumnFamilyException.class.getName()))))); 283 } 284 285 private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 286 int limit) throws Exception { 287 testScan(start, startInclusive, stop, stopInclusive, limit, -1); 288 } 289 290 private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 291 int limit, int closeAfter) throws Exception { 292 Scan scan = 293 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) 294 .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive); 295 if (limit > 0) { 296 scan.setLimit(limit); 297 } 298 List<Result> results = doScan(scan, closeAfter); 299 int actualStart = startInclusive ? start : start + 1; 300 int actualStop = stopInclusive ? stop + 1 : stop; 301 int count = actualStop - actualStart; 302 if (limit > 0) { 303 count = Math.min(count, limit); 304 } 305 if (closeAfter > 0) { 306 count = Math.min(count, closeAfter); 307 } 308 assertEquals(count, results.size()); 309 IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i)); 310 } 311 312 private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 313 int limit) throws Exception { 314 Scan scan = 315 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) 316 .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true); 317 if (limit > 0) { 318 scan.setLimit(limit); 319 } 320 List<Result> results = doScan(scan, -1); 321 int actualStart = startInclusive ? start : start - 1; 322 int actualStop = stopInclusive ? stop - 1 : stop; 323 int count = actualStart - actualStop; 324 if (limit > 0) { 325 count = Math.min(count, limit); 326 } 327 assertEquals(count, results.size()); 328 IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i)); 329 } 330 331 @TestTemplate 332 public void testScanWithStartKeyAndStopKey() throws Exception { 333 testScan(1, true, 998, false, -1); // from first region to last region 334 testScan(123, true, 345, true, -1); 335 testScan(234, true, 456, false, -1); 336 testScan(345, false, 567, true, -1); 337 testScan(456, false, 678, false, -1); 338 } 339 340 @TestTemplate 341 public void testReversedScanWithStartKeyAndStopKey() throws Exception { 342 testReversedScan(998, true, 1, false, -1); // from last region to first region 343 testReversedScan(543, true, 321, true, -1); 344 testReversedScan(654, true, 432, false, -1); 345 testReversedScan(765, false, 543, true, -1); 346 testReversedScan(876, false, 654, false, -1); 347 } 348 349 @TestTemplate 350 public void testScanAtRegionBoundary() throws Exception { 351 testScan(222, true, 333, true, -1); 352 testScan(333, true, 444, false, -1); 353 testScan(444, false, 555, true, -1); 354 testScan(555, false, 666, false, -1); 355 } 356 357 @TestTemplate 358 public void testReversedScanAtRegionBoundary() throws Exception { 359 testReversedScan(333, true, 222, true, -1); 360 testReversedScan(444, true, 333, false, -1); 361 testReversedScan(555, false, 444, true, -1); 362 testReversedScan(666, false, 555, false, -1); 363 } 364 365 @TestTemplate 366 public void testScanWithLimit() throws Exception { 367 testScan(1, true, 998, false, 900); // from first region to last region 368 testScan(123, true, 234, true, 100); 369 testScan(234, true, 456, false, 100); 370 testScan(345, false, 567, true, 100); 371 testScan(456, false, 678, false, 100); 372 } 373 374 @TestTemplate 375 public void testScanWithLimitGreaterThanActualCount() throws Exception { 376 testScan(1, true, 998, false, 1000); // from first region to last region 377 testScan(123, true, 345, true, 200); 378 testScan(234, true, 456, false, 200); 379 testScan(345, false, 567, true, 200); 380 testScan(456, false, 678, false, 200); 381 } 382 383 @TestTemplate 384 public void testReversedScanWithLimit() throws Exception { 385 testReversedScan(998, true, 1, false, 900); // from last region to first region 386 testReversedScan(543, true, 321, true, 100); 387 testReversedScan(654, true, 432, false, 100); 388 testReversedScan(765, false, 543, true, 100); 389 testReversedScan(876, false, 654, false, 100); 390 } 391 392 @TestTemplate 393 public void testReversedScanWithLimitGreaterThanActualCount() throws Exception { 394 testReversedScan(998, true, 1, false, 1000); // from last region to first region 395 testReversedScan(543, true, 321, true, 200); 396 testReversedScan(654, true, 432, false, 200); 397 testReversedScan(765, false, 543, true, 200); 398 testReversedScan(876, false, 654, false, 200); 399 } 400 401 @TestTemplate 402 public void testScanEndingEarly() throws Exception { 403 testScan(1, true, 998, false, 0, 900); // from first region to last region 404 testScan(123, true, 234, true, 0, 100); 405 testScan(234, true, 456, false, 0, 100); 406 testScan(345, false, 567, true, 0, 100); 407 testScan(456, false, 678, false, 0, 100); 408 } 409}