001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.allOf; 023import static org.hamcrest.Matchers.anyOf; 024import static org.hamcrest.Matchers.containsString; 025import static org.hamcrest.Matchers.endsWith; 026import static org.hamcrest.Matchers.hasItem; 027import static org.hamcrest.Matchers.hasProperty; 028import static org.hamcrest.Matchers.is; 029import static org.hamcrest.Matchers.isA; 030import static org.junit.Assert.assertEquals; 031import static org.junit.Assert.assertThrows; 032import static org.junit.Assert.fail; 033 034import io.opentelemetry.sdk.trace.data.SpanData; 035import java.io.IOException; 036import java.io.UncheckedIOException; 037import java.util.Arrays; 038import java.util.List; 039import java.util.Objects; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.ForkJoinPool; 042import java.util.concurrent.TimeUnit; 043import java.util.function.Supplier; 044import java.util.stream.Collectors; 045import java.util.stream.IntStream; 046import java.util.stream.Stream; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.hbase.ConnectionRule; 049import org.apache.hadoop.hbase.HBaseTestingUtil; 050import org.apache.hadoop.hbase.MatcherPredicate; 051import org.apache.hadoop.hbase.MiniClusterRule; 052import org.apache.hadoop.hbase.StartTestingClusterOption; 053import org.apache.hadoop.hbase.TableName; 054import org.apache.hadoop.hbase.Waiter; 055import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; 056import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 057import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; 058import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule; 059import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule; 060import org.apache.hadoop.hbase.trace.TraceUtil; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.JVMClusterUtil; 063import org.apache.hadoop.hbase.util.Pair; 064import org.hamcrest.Matcher; 065import org.junit.ClassRule; 066import org.junit.Rule; 067import org.junit.Test; 068import org.junit.rules.ExternalResource; 069import org.junit.rules.RuleChain; 070import org.junit.rules.TestName; 071import org.junit.rules.TestRule; 072 073public abstract class AbstractTestAsyncTableScan { 074 075 protected static final OpenTelemetryClassRule OTEL_CLASS_RULE = OpenTelemetryClassRule.create(); 076 077 private static Configuration createConfiguration() { 078 Configuration conf = new Configuration(); 079 // Disable directory sharing to prevent race conditions when tests run in parallel. 080 // Each test instance gets its own isolated directories to avoid one test's tearDown() 081 // deleting directories another parallel test is still using. 082 conf.setBoolean("hbase.test.disable-directory-sharing", true); 083 return conf; 084 } 085 086 protected static final MiniClusterRule MINI_CLUSTER_RULE = 087 MiniClusterRule.newBuilder().setConfiguration(createConfiguration()) 088 .setMiniClusterOption(StartTestingClusterOption.builder().numWorkers(3).build()).build(); 089 090 protected static final ConnectionRule CONN_RULE = 091 ConnectionRule.createAsyncConnectionRule(MINI_CLUSTER_RULE::createAsyncConnection); 092 093 private static final class Setup extends ExternalResource { 094 @Override 095 protected void before() throws Throwable { 096 final HBaseTestingUtil testingUtil = MINI_CLUSTER_RULE.getTestingUtility(); 097 final AsyncConnection conn = CONN_RULE.getAsyncConnection(); 098 099 byte[][] splitKeys = new byte[8][]; 100 for (int i = 111; i < 999; i += 111) { 101 splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 102 } 103 testingUtil.createTable(TABLE_NAME, FAMILY, splitKeys); 104 testingUtil.waitTableAvailable(TABLE_NAME); 105 conn.getTable(TABLE_NAME) 106 .putAll(IntStream.range(0, COUNT) 107 .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) 108 .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) 109 .collect(Collectors.toList())) 110 .get(); 111 } 112 } 113 114 @ClassRule 115 public static final TestRule classRule = RuleChain.outerRule(OTEL_CLASS_RULE) 116 .around(MINI_CLUSTER_RULE).around(CONN_RULE).around(new Setup()); 117 118 @Rule 119 public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(OTEL_CLASS_RULE); 120 121 @Rule 122 public final TestName testName = new TestName(); 123 124 protected static TableName TABLE_NAME = TableName.valueOf("async"); 125 126 protected static byte[] FAMILY = Bytes.toBytes("cf"); 127 128 protected static byte[] CQ1 = Bytes.toBytes("cq1"); 129 130 protected static byte[] CQ2 = Bytes.toBytes("cq2"); 131 132 protected static int COUNT = 1000; 133 134 private static Scan createNormalScan() { 135 return new Scan(); 136 } 137 138 private static Scan createBatchScan() { 139 return new Scan().setBatch(1); 140 } 141 142 // set a small result size for testing flow control 143 private static Scan createSmallResultSizeScan() { 144 return new Scan().setMaxResultSize(1); 145 } 146 147 private static Scan createBatchSmallResultSizeScan() { 148 return new Scan().setBatch(1).setMaxResultSize(1); 149 } 150 151 private static AsyncTable<?> getRawTable() { 152 return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME); 153 } 154 155 private static AsyncTable<?> getTable() { 156 return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); 157 } 158 159 private static List<Pair<String, Supplier<Scan>>> getScanCreator() { 160 return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan), 161 Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan), 162 Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan), 163 Pair.newPair("batchSmallResultSize", 164 AbstractTestAsyncTableScan::createBatchSmallResultSizeScan)); 165 } 166 167 protected static List<Object[]> getScanCreatorParams() { 168 return getScanCreator().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() }) 169 .collect(Collectors.toList()); 170 } 171 172 private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() { 173 return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable), 174 Pair.newPair("normal", AbstractTestAsyncTableScan::getTable)); 175 } 176 177 protected static List<Object[]> getTableAndScanCreatorParams() { 178 List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator(); 179 List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator(); 180 return tableCreator.stream() 181 .flatMap(tp -> scanCreator.stream() 182 .map(sp -> new Object[] { tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond() })) 183 .collect(Collectors.toList()); 184 } 185 186 protected abstract Scan createScan(); 187 188 protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception; 189 190 /** 191 * Used by implementation classes to assert the correctness of spans produced under test. 192 */ 193 protected abstract void assertTraceContinuity(); 194 195 /** 196 * Used by implementation classes to assert the correctness of spans having errors. 197 */ 198 protected abstract void 199 assertTraceError(final Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher); 200 201 protected final List<Result> convertFromBatchResult(List<Result> results) { 202 assertEquals(0, results.size() % 2); 203 return IntStream.range(0, results.size() / 2).mapToObj(i -> { 204 try { 205 return Result 206 .createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1))); 207 } catch (IOException e) { 208 throw new UncheckedIOException(e); 209 } 210 }).collect(Collectors.toList()); 211 } 212 213 protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) { 214 final Configuration conf = MINI_CLUSTER_RULE.getTestingUtility().getConfiguration(); 215 Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( 216 "Span for test failed to complete.", OTEL_CLASS_RULE::getSpans, hasItem(parentSpanMatcher))); 217 } 218 219 protected static Stream<SpanData> spanStream() { 220 return OTEL_CLASS_RULE.getSpans().stream().filter(Objects::nonNull); 221 } 222 223 @Test 224 public void testScanAll() throws Exception { 225 List<Result> results = doScan(createScan(), -1); 226 // make sure all scanners are closed at RS side 227 MINI_CLUSTER_RULE.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream() 228 .map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach( 229 rs -> assertEquals( 230 "The scanner count of " + rs.getServerName() + " is " 231 + rs.getRSRpcServices().getScannersCount(), 232 0, rs.getRSRpcServices().getScannersCount())); 233 assertEquals(COUNT, results.size()); 234 IntStream.range(0, COUNT).forEach(i -> { 235 Result result = results.get(i); 236 assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); 237 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); 238 }); 239 } 240 241 private void assertResultEquals(Result result, int i) { 242 assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); 243 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); 244 assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2))); 245 } 246 247 @Test 248 public void testReversedScanAll() throws Exception { 249 List<Result> results = 250 TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), testName.getMethodName()); 251 assertEquals(COUNT, results.size()); 252 IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); 253 assertTraceContinuity(); 254 } 255 256 @Test 257 public void testScanNoStopKey() throws Exception { 258 int start = 345; 259 List<Result> results = TraceUtil.trace( 260 () -> doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1), 261 testName.getMethodName()); 262 assertEquals(COUNT - start, results.size()); 263 IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); 264 assertTraceContinuity(); 265 } 266 267 @Test 268 public void testReverseScanNoStopKey() throws Exception { 269 int start = 765; 270 final Scan scan = 271 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true); 272 List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), testName.getMethodName()); 273 assertEquals(start + 1, results.size()); 274 IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); 275 assertTraceContinuity(); 276 } 277 278 @Test 279 public void testScanWrongColumnFamily() { 280 final Exception e = assertThrows(Exception.class, 281 () -> TraceUtil.trace( 282 () -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1), 283 testName.getMethodName())); 284 // hamcrest generic enforcement for `anyOf` is a pain; skip it 285 // but -- don't we always unwrap ExecutionExceptions -- bug? 286 if (e instanceof NoSuchColumnFamilyException) { 287 final NoSuchColumnFamilyException ex = (NoSuchColumnFamilyException) e; 288 assertThat(ex, isA(NoSuchColumnFamilyException.class)); 289 } else if (e instanceof ExecutionException) { 290 final ExecutionException ex = (ExecutionException) e; 291 assertThat(ex, allOf(isA(ExecutionException.class), 292 hasProperty("cause", isA(NoSuchColumnFamilyException.class)))); 293 } else { 294 fail("Found unexpected Exception " + e); 295 } 296 assertTraceError(anyOf( 297 containsEntry(is(HBaseSemanticAttributes.EXCEPTION_TYPE), 298 endsWith(NoSuchColumnFamilyException.class.getName())), 299 allOf( 300 containsEntry(is(HBaseSemanticAttributes.EXCEPTION_TYPE), 301 endsWith(RemoteWithExtrasException.class.getName())), 302 containsEntry(is(HBaseSemanticAttributes.EXCEPTION_MESSAGE), 303 containsString(NoSuchColumnFamilyException.class.getName()))))); 304 } 305 306 private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 307 int limit) throws Exception { 308 testScan(start, startInclusive, stop, stopInclusive, limit, -1); 309 } 310 311 private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 312 int limit, int closeAfter) throws Exception { 313 Scan scan = 314 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) 315 .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive); 316 if (limit > 0) { 317 scan.setLimit(limit); 318 } 319 List<Result> results = doScan(scan, closeAfter); 320 int actualStart = startInclusive ? start : start + 1; 321 int actualStop = stopInclusive ? stop + 1 : stop; 322 int count = actualStop - actualStart; 323 if (limit > 0) { 324 count = Math.min(count, limit); 325 } 326 if (closeAfter > 0) { 327 count = Math.min(count, closeAfter); 328 } 329 assertEquals(count, results.size()); 330 IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i)); 331 } 332 333 private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 334 int limit) throws Exception { 335 Scan scan = 336 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) 337 .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true); 338 if (limit > 0) { 339 scan.setLimit(limit); 340 } 341 List<Result> results = doScan(scan, -1); 342 int actualStart = startInclusive ? start : start - 1; 343 int actualStop = stopInclusive ? stop - 1 : stop; 344 int count = actualStart - actualStop; 345 if (limit > 0) { 346 count = Math.min(count, limit); 347 } 348 assertEquals(count, results.size()); 349 IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i)); 350 } 351 352 @Test 353 public void testScanWithStartKeyAndStopKey() throws Exception { 354 testScan(1, true, 998, false, -1); // from first region to last region 355 testScan(123, true, 345, true, -1); 356 testScan(234, true, 456, false, -1); 357 testScan(345, false, 567, true, -1); 358 testScan(456, false, 678, false, -1); 359 } 360 361 @Test 362 public void testReversedScanWithStartKeyAndStopKey() throws Exception { 363 testReversedScan(998, true, 1, false, -1); // from last region to first region 364 testReversedScan(543, true, 321, true, -1); 365 testReversedScan(654, true, 432, false, -1); 366 testReversedScan(765, false, 543, true, -1); 367 testReversedScan(876, false, 654, false, -1); 368 } 369 370 @Test 371 public void testScanAtRegionBoundary() throws Exception { 372 testScan(222, true, 333, true, -1); 373 testScan(333, true, 444, false, -1); 374 testScan(444, false, 555, true, -1); 375 testScan(555, false, 666, false, -1); 376 } 377 378 @Test 379 public void testReversedScanAtRegionBoundary() throws Exception { 380 testReversedScan(333, true, 222, true, -1); 381 testReversedScan(444, true, 333, false, -1); 382 testReversedScan(555, false, 444, true, -1); 383 testReversedScan(666, false, 555, false, -1); 384 } 385 386 @Test 387 public void testScanWithLimit() throws Exception { 388 testScan(1, true, 998, false, 900); // from first region to last region 389 testScan(123, true, 234, true, 100); 390 testScan(234, true, 456, false, 100); 391 testScan(345, false, 567, true, 100); 392 testScan(456, false, 678, false, 100); 393 } 394 395 @Test 396 public void testScanWithLimitGreaterThanActualCount() throws Exception { 397 testScan(1, true, 998, false, 1000); // from first region to last region 398 testScan(123, true, 345, true, 200); 399 testScan(234, true, 456, false, 200); 400 testScan(345, false, 567, true, 200); 401 testScan(456, false, 678, false, 200); 402 } 403 404 @Test 405 public void testReversedScanWithLimit() throws Exception { 406 testReversedScan(998, true, 1, false, 900); // from last region to first region 407 testReversedScan(543, true, 321, true, 100); 408 testReversedScan(654, true, 432, false, 100); 409 testReversedScan(765, false, 543, true, 100); 410 testReversedScan(876, false, 654, false, 100); 411 } 412 413 @Test 414 public void testReversedScanWithLimitGreaterThanActualCount() throws Exception { 415 testReversedScan(998, true, 1, false, 1000); // from last region to first region 416 testReversedScan(543, true, 321, true, 200); 417 testReversedScan(654, true, 432, false, 200); 418 testReversedScan(765, false, 543, true, 200); 419 testReversedScan(876, false, 654, false, 200); 420 } 421 422 @Test 423 public void testScanEndingEarly() throws Exception { 424 testScan(1, true, 998, false, 0, 900); // from first region to last region 425 testScan(123, true, 234, true, 0, 100); 426 testScan(234, true, 456, false, 0, 100); 427 testScan(345, false, 567, true, 0, 100); 428 testScan(456, false, 678, false, 0, 100); 429 } 430}