001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.allOf; 023import static org.hamcrest.Matchers.anyOf; 024import static org.hamcrest.Matchers.containsString; 025import static org.hamcrest.Matchers.endsWith; 026import static org.hamcrest.Matchers.hasItem; 027import static org.hamcrest.Matchers.hasProperty; 028import static org.hamcrest.Matchers.is; 029import static org.hamcrest.Matchers.isA; 030import static org.junit.Assert.assertEquals; 031import static org.junit.Assert.assertThrows; 032import static org.junit.Assert.fail; 033 034import io.opentelemetry.sdk.trace.data.SpanData; 035import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; 036import java.io.IOException; 037import java.io.UncheckedIOException; 038import java.util.Arrays; 039import java.util.List; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.ForkJoinPool; 042import java.util.concurrent.TimeUnit; 043import java.util.function.Supplier; 044import java.util.stream.Collectors; 045import java.util.stream.IntStream; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.hbase.ConnectionRule; 048import org.apache.hadoop.hbase.HBaseTestingUtility; 049import org.apache.hadoop.hbase.MatcherPredicate; 050import org.apache.hadoop.hbase.MiniClusterRule; 051import org.apache.hadoop.hbase.StartMiniClusterOption; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.Waiter; 054import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException; 055import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; 056import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule; 057import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule; 058import org.apache.hadoop.hbase.trace.TraceUtil; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.JVMClusterUtil; 061import org.apache.hadoop.hbase.util.Pair; 062import org.hamcrest.Matcher; 063import org.junit.ClassRule; 064import org.junit.Rule; 065import org.junit.Test; 066import org.junit.rules.ExternalResource; 067import org.junit.rules.RuleChain; 068import org.junit.rules.TestName; 069import org.junit.rules.TestRule; 070 071public abstract class AbstractTestAsyncTableScan { 072 073 protected static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create(); 074 protected static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder() 075 .setMiniClusterOption(StartMiniClusterOption.builder().numWorkers(3).build()).build(); 076 077 protected static final ConnectionRule connectionRule = 078 ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection); 079 080 private static final class Setup extends ExternalResource { 081 @Override 082 protected void before() throws Throwable { 083 final HBaseTestingUtility testingUtil = miniClusterRule.getTestingUtility(); 084 final AsyncConnection conn = connectionRule.getAsyncConnection(); 085 086 byte[][] splitKeys = new byte[8][]; 087 for (int i = 111; i < 999; i += 111) { 088 splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); 089 } 090 testingUtil.createTable(TABLE_NAME, FAMILY, splitKeys); 091 testingUtil.waitTableAvailable(TABLE_NAME); 092 conn.getTable(TABLE_NAME) 093 .putAll(IntStream.range(0, COUNT) 094 .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) 095 .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) 096 .collect(Collectors.toList())) 097 .get(); 098 } 099 } 100 101 @ClassRule 102 public static final TestRule classRule = RuleChain.outerRule(otelClassRule) 103 .around(miniClusterRule).around(connectionRule).around(new Setup()); 104 105 @Rule 106 public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule); 107 108 @Rule 109 public final TestName testName = new TestName(); 110 111 protected static TableName TABLE_NAME = TableName.valueOf("async"); 112 113 protected static byte[] FAMILY = Bytes.toBytes("cf"); 114 115 protected static byte[] CQ1 = Bytes.toBytes("cq1"); 116 117 protected static byte[] CQ2 = Bytes.toBytes("cq2"); 118 119 protected static int COUNT = 1000; 120 121 private static Scan createNormalScan() { 122 return new Scan(); 123 } 124 125 private static Scan createBatchScan() { 126 return new Scan().setBatch(1); 127 } 128 129 // set a small result size for testing flow control 130 private static Scan createSmallResultSizeScan() { 131 return new Scan().setMaxResultSize(1); 132 } 133 134 private static Scan createBatchSmallResultSizeScan() { 135 return new Scan().setBatch(1).setMaxResultSize(1); 136 } 137 138 private static AsyncTable<?> getRawTable() { 139 return connectionRule.getAsyncConnection().getTable(TABLE_NAME); 140 } 141 142 private static AsyncTable<?> getTable() { 143 return connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); 144 } 145 146 private static List<Pair<String, Supplier<Scan>>> getScanCreator() { 147 return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan), 148 Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan), 149 Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan), 150 Pair.newPair("batchSmallResultSize", 151 AbstractTestAsyncTableScan::createBatchSmallResultSizeScan)); 152 } 153 154 protected static List<Object[]> getScanCreatorParams() { 155 return getScanCreator().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() }) 156 .collect(Collectors.toList()); 157 } 158 159 private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() { 160 return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable), 161 Pair.newPair("normal", AbstractTestAsyncTableScan::getTable)); 162 } 163 164 protected static List<Object[]> getTableAndScanCreatorParams() { 165 List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator(); 166 List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator(); 167 return tableCreator.stream() 168 .flatMap(tp -> scanCreator.stream() 169 .map(sp -> new Object[] { tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond() })) 170 .collect(Collectors.toList()); 171 } 172 173 protected abstract Scan createScan(); 174 175 protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception; 176 177 /** 178 * Used by implementation classes to assert the correctness of spans produced under test. 179 */ 180 protected abstract void assertTraceContinuity(); 181 182 /** 183 * Used by implementation classes to assert the correctness of spans having errors. 184 */ 185 protected abstract void 186 assertTraceError(final Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher); 187 188 protected final List<Result> convertFromBatchResult(List<Result> results) { 189 assertEquals(0, results.size() % 2); 190 return IntStream.range(0, results.size() / 2).mapToObj(i -> { 191 try { 192 return Result 193 .createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1))); 194 } catch (IOException e) { 195 throw new UncheckedIOException(e); 196 } 197 }).collect(Collectors.toList()); 198 } 199 200 protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) { 201 final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration(); 202 Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( 203 "Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher))); 204 } 205 206 @Test 207 public void testScanAll() throws Exception { 208 List<Result> results = doScan(createScan(), -1); 209 // make sure all scanners are closed at RS side 210 miniClusterRule.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream() 211 .map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach( 212 rs -> assertEquals( 213 "The scanner count of " + rs.getServerName() + " is " 214 + rs.getRSRpcServices().getScannersCount(), 215 0, rs.getRSRpcServices().getScannersCount())); 216 assertEquals(COUNT, results.size()); 217 IntStream.range(0, COUNT).forEach(i -> { 218 Result result = results.get(i); 219 assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); 220 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); 221 }); 222 } 223 224 private void assertResultEquals(Result result, int i) { 225 assertEquals(String.format("%03d", i), Bytes.toString(result.getRow())); 226 assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1))); 227 assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2))); 228 } 229 230 @Test 231 public void testReversedScanAll() throws Exception { 232 List<Result> results = 233 TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), testName.getMethodName()); 234 assertEquals(COUNT, results.size()); 235 IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); 236 assertTraceContinuity(); 237 } 238 239 @Test 240 public void testScanNoStopKey() throws Exception { 241 int start = 345; 242 List<Result> results = TraceUtil.trace( 243 () -> doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1), 244 testName.getMethodName()); 245 assertEquals(COUNT - start, results.size()); 246 IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); 247 assertTraceContinuity(); 248 } 249 250 @Test 251 public void testReverseScanNoStopKey() throws Exception { 252 int start = 765; 253 final Scan scan = 254 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true); 255 List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), testName.getMethodName()); 256 assertEquals(start + 1, results.size()); 257 IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); 258 assertTraceContinuity(); 259 } 260 261 @Test 262 public void testScanWrongColumnFamily() { 263 final Exception e = assertThrows(Exception.class, 264 () -> TraceUtil.trace( 265 () -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1), 266 testName.getMethodName())); 267 // hamcrest generic enforcement for `anyOf` is a pain; skip it 268 // but -- don't we always unwrap ExecutionExceptions -- bug? 269 if (e instanceof NoSuchColumnFamilyException) { 270 final NoSuchColumnFamilyException ex = (NoSuchColumnFamilyException) e; 271 assertThat(ex, isA(NoSuchColumnFamilyException.class)); 272 } else if (e instanceof ExecutionException) { 273 final ExecutionException ex = (ExecutionException) e; 274 assertThat(ex, allOf(isA(ExecutionException.class), 275 hasProperty("cause", isA(NoSuchColumnFamilyException.class)))); 276 } else { 277 fail("Found unexpected Exception " + e); 278 } 279 assertTraceError(anyOf( 280 containsEntry(is(SemanticAttributes.EXCEPTION_TYPE), 281 endsWith(NoSuchColumnFamilyException.class.getName())), 282 allOf( 283 containsEntry(is(SemanticAttributes.EXCEPTION_TYPE), 284 endsWith(RemoteWithExtrasException.class.getName())), 285 containsEntry(is(SemanticAttributes.EXCEPTION_MESSAGE), 286 containsString(NoSuchColumnFamilyException.class.getName()))))); 287 } 288 289 private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 290 int limit) throws Exception { 291 testScan(start, startInclusive, stop, stopInclusive, limit, -1); 292 } 293 294 private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 295 int limit, int closeAfter) throws Exception { 296 Scan scan = 297 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) 298 .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive); 299 if (limit > 0) { 300 scan.setLimit(limit); 301 } 302 List<Result> results = doScan(scan, closeAfter); 303 int actualStart = startInclusive ? start : start + 1; 304 int actualStop = stopInclusive ? stop + 1 : stop; 305 int count = actualStop - actualStart; 306 if (limit > 0) { 307 count = Math.min(count, limit); 308 } 309 if (closeAfter > 0) { 310 count = Math.min(count, closeAfter); 311 } 312 assertEquals(count, results.size()); 313 IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i)); 314 } 315 316 private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive, 317 int limit) throws Exception { 318 Scan scan = 319 createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive) 320 .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true); 321 if (limit > 0) { 322 scan.setLimit(limit); 323 } 324 List<Result> results = doScan(scan, -1); 325 int actualStart = startInclusive ? start : start - 1; 326 int actualStop = stopInclusive ? stop - 1 : stop; 327 int count = actualStart - actualStop; 328 if (limit > 0) { 329 count = Math.min(count, limit); 330 } 331 assertEquals(count, results.size()); 332 IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i)); 333 } 334 335 @Test 336 public void testScanWithStartKeyAndStopKey() throws Exception { 337 testScan(1, true, 998, false, -1); // from first region to last region 338 testScan(123, true, 345, true, -1); 339 testScan(234, true, 456, false, -1); 340 testScan(345, false, 567, true, -1); 341 testScan(456, false, 678, false, -1); 342 } 343 344 @Test 345 public void testReversedScanWithStartKeyAndStopKey() throws Exception { 346 testReversedScan(998, true, 1, false, -1); // from last region to first region 347 testReversedScan(543, true, 321, true, -1); 348 testReversedScan(654, true, 432, false, -1); 349 testReversedScan(765, false, 543, true, -1); 350 testReversedScan(876, false, 654, false, -1); 351 } 352 353 @Test 354 public void testScanAtRegionBoundary() throws Exception { 355 testScan(222, true, 333, true, -1); 356 testScan(333, true, 444, false, -1); 357 testScan(444, false, 555, true, -1); 358 testScan(555, false, 666, false, -1); 359 } 360 361 @Test 362 public void testReversedScanAtRegionBoundary() throws Exception { 363 testReversedScan(333, true, 222, true, -1); 364 testReversedScan(444, true, 333, false, -1); 365 testReversedScan(555, false, 444, true, -1); 366 testReversedScan(666, false, 555, false, -1); 367 } 368 369 @Test 370 public void testScanWithLimit() throws Exception { 371 testScan(1, true, 998, false, 900); // from first region to last region 372 testScan(123, true, 234, true, 100); 373 testScan(234, true, 456, false, 100); 374 testScan(345, false, 567, true, 100); 375 testScan(456, false, 678, false, 100); 376 } 377 378 @Test 379 public void testScanWithLimitGreaterThanActualCount() throws Exception { 380 testScan(1, true, 998, false, 1000); // from first region to last region 381 testScan(123, true, 345, true, 200); 382 testScan(234, true, 456, false, 200); 383 testScan(345, false, 567, true, 200); 384 testScan(456, false, 678, false, 200); 385 } 386 387 @Test 388 public void testReversedScanWithLimit() throws Exception { 389 testReversedScan(998, true, 1, false, 900); // from last region to first region 390 testReversedScan(543, true, 321, true, 100); 391 testReversedScan(654, true, 432, false, 100); 392 testReversedScan(765, false, 543, true, 100); 393 testReversedScan(876, false, 654, false, 100); 394 } 395 396 @Test 397 public void testReversedScanWithLimitGreaterThanActualCount() throws Exception { 398 testReversedScan(998, true, 1, false, 1000); // from last region to first region 399 testReversedScan(543, true, 321, true, 200); 400 testReversedScan(654, true, 432, false, 200); 401 testReversedScan(765, false, 543, true, 200); 402 testReversedScan(876, false, 654, false, 200); 403 } 404 405 @Test 406 public void testScanEndingEarly() throws Exception { 407 testScan(1, true, 998, false, 0, 900); // from first region to last region 408 testScan(123, true, 234, true, 0, 100); 409 testScan(234, true, 456, false, 0, 100); 410 testScan(345, false, 567, true, 0, 100); 411 testScan(456, false, 678, false, 0, 100); 412 } 413}