001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.allOf;
023import static org.hamcrest.Matchers.anyOf;
024import static org.hamcrest.Matchers.containsString;
025import static org.hamcrest.Matchers.endsWith;
026import static org.hamcrest.Matchers.hasItem;
027import static org.hamcrest.Matchers.hasProperty;
028import static org.hamcrest.Matchers.is;
029import static org.hamcrest.Matchers.isA;
030import static org.junit.jupiter.api.Assertions.assertEquals;
031import static org.junit.jupiter.api.Assertions.assertThrows;
032import static org.junit.jupiter.api.Assertions.fail;
033
034import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
035import io.opentelemetry.sdk.trace.data.SpanData;
036import java.io.IOException;
037import java.io.UncheckedIOException;
038import java.util.Arrays;
039import java.util.List;
040import java.util.Objects;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ForkJoinPool;
043import java.util.concurrent.TimeUnit;
044import java.util.function.Supplier;
045import java.util.stream.Collectors;
046import java.util.stream.IntStream;
047import java.util.stream.Stream;
048import org.apache.hadoop.hbase.HBaseTestingUtil;
049import org.apache.hadoop.hbase.MatcherPredicate;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
052import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
053import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
054import org.apache.hadoop.hbase.trace.TraceUtil;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.JVMClusterUtil;
057import org.apache.hadoop.hbase.util.Pair;
058import org.hamcrest.Matcher;
059import org.junit.jupiter.api.AfterAll;
060import org.junit.jupiter.api.BeforeAll;
061import org.junit.jupiter.api.BeforeEach;
062import org.junit.jupiter.api.TestInfo;
063import org.junit.jupiter.api.TestTemplate;
064import org.junit.jupiter.api.extension.RegisterExtension;
065import org.junit.jupiter.params.provider.Arguments;
066
067import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
068
069public abstract class AbstractTestAsyncTableScan {
070
071  @RegisterExtension
072  protected static final OpenTelemetryExtension OTEL_EXT = OpenTelemetryExtension.create();
073
074  protected static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
075
076  protected static AsyncConnection CONN;
077
078  protected String methodName;
079
080  @BeforeAll
081  public static void setUpBeforeClass() throws Exception {
082    UTIL.startMiniCluster(3);
083    byte[][] splitKeys = new byte[8][];
084    for (int i = 111; i < 999; i += 111) {
085      splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
086    }
087    UTIL.createTable(TABLE_NAME, FAMILY, splitKeys);
088    UTIL.waitTableAvailable(TABLE_NAME);
089    try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
090      table.put(IntStream.range(0, COUNT)
091        .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
092          .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
093        .collect(Collectors.toList()));
094    }
095    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
096  }
097
098  @AfterAll
099  public static void tearDownAfterClass() throws Exception {
100    Closeables.close(CONN, true);
101    UTIL.shutdownMiniCluster();
102  }
103
104  @BeforeEach
105  public void setUp(TestInfo testInfo) {
106    methodName = testInfo.getTestMethod().get().getName();
107  }
108
109  protected static TableName TABLE_NAME = TableName.valueOf("async");
110
111  protected static byte[] FAMILY = Bytes.toBytes("cf");
112
113  protected static byte[] CQ1 = Bytes.toBytes("cq1");
114
115  protected static byte[] CQ2 = Bytes.toBytes("cq2");
116
117  protected static int COUNT = 1000;
118
119  private static Scan createNormalScan() {
120    return new Scan();
121  }
122
123  private static Scan createBatchScan() {
124    return new Scan().setBatch(1);
125  }
126
127  // set a small result size for testing flow control
128  private static Scan createSmallResultSizeScan() {
129    return new Scan().setMaxResultSize(1);
130  }
131
132  private static Scan createBatchSmallResultSizeScan() {
133    return new Scan().setBatch(1).setMaxResultSize(1);
134  }
135
136  private static AsyncTable<?> getRawTable() {
137    return CONN.getTable(TABLE_NAME);
138  }
139
140  private static AsyncTable<?> getTable() {
141    return CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
142  }
143
144  private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
145    return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan),
146      Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan),
147      Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan),
148      Pair.newPair("batchSmallResultSize",
149        AbstractTestAsyncTableScan::createBatchSmallResultSizeScan));
150  }
151
152  protected static Stream<Arguments> getScanCreatorParams() {
153    return getScanCreator().stream().map(p -> Arguments.of(p.getFirst(), p.getSecond()));
154  }
155
156  private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() {
157    return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable),
158      Pair.newPair("normal", AbstractTestAsyncTableScan::getTable));
159  }
160
161  protected static Stream<Arguments> getTableAndScanCreatorParams() {
162    List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator();
163    List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator();
164    return tableCreator.stream().flatMap(tp -> scanCreator.stream()
165      .map(sp -> Arguments.of(tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond())));
166  }
167
168  protected abstract Scan createScan();
169
170  protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception;
171
172  /**
173   * Used by implementation classes to assert the correctness of spans produced under test.
174   */
175  protected abstract void assertTraceContinuity();
176
177  /**
178   * Used by implementation classes to assert the correctness of spans having errors.
179   */
180  protected abstract void
181    assertTraceError(final Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher);
182
183  protected final List<Result> convertFromBatchResult(List<Result> results) {
184    assertEquals(0, results.size() % 2);
185    return IntStream.range(0, results.size() / 2).mapToObj(i -> {
186      try {
187        return Result
188          .createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1)));
189      } catch (IOException e) {
190        throw new UncheckedIOException(e);
191      }
192    }).collect(Collectors.toList());
193  }
194
195  protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
196    UTIL.waitFor(TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
197      "Span for test failed to complete.", OTEL_EXT::getSpans, hasItem(parentSpanMatcher)));
198  }
199
200  protected static Stream<SpanData> spanStream() {
201    return OTEL_EXT.getSpans().stream().filter(Objects::nonNull);
202  }
203
204  @TestTemplate
205  public void testScanAll() throws Exception {
206    List<Result> results = doScan(createScan(), -1);
207    // make sure all scanners are closed at RS side
208    UTIL.getHBaseCluster().getRegionServerThreads().stream()
209      .map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach(rs -> {
210        int scannerCount = rs.getRSRpcServices().getScannersCount();
211        assertEquals(0, scannerCount,
212          "The scanner count of " + rs.getServerName() + " is " + scannerCount);
213      });
214    assertEquals(COUNT, results.size());
215    IntStream.range(0, COUNT).forEach(i -> {
216      Result result = results.get(i);
217      assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
218      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
219    });
220  }
221
222  private void assertResultEquals(Result result, int i) {
223    assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
224    assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
225    assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2)));
226  }
227
228  @TestTemplate
229  public void testReversedScanAll() throws Exception {
230    List<Result> results =
231      TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), methodName);
232    assertEquals(COUNT, results.size());
233    IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
234    assertTraceContinuity();
235  }
236
237  @TestTemplate
238  public void testScanNoStopKey() throws Exception {
239    int start = 345;
240    List<Result> results = TraceUtil.trace(
241      () -> doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1),
242      methodName);
243    assertEquals(COUNT - start, results.size());
244    IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
245    assertTraceContinuity();
246  }
247
248  @TestTemplate
249  public void testReverseScanNoStopKey() throws Exception {
250    int start = 765;
251    final Scan scan =
252      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true);
253    List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), methodName);
254    assertEquals(start + 1, results.size());
255    IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
256    assertTraceContinuity();
257  }
258
259  @TestTemplate
260  public void testScanWrongColumnFamily() {
261    final Exception e = assertThrows(Exception.class, () -> TraceUtil.trace(
262      () -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1), methodName));
263    // hamcrest generic enforcement for `anyOf` is a pain; skip it
264    // but -- don't we always unwrap ExecutionExceptions -- bug?
265    if (e instanceof NoSuchColumnFamilyException) {
266      final NoSuchColumnFamilyException ex = (NoSuchColumnFamilyException) e;
267      assertThat(ex, isA(NoSuchColumnFamilyException.class));
268    } else if (e instanceof ExecutionException) {
269      final ExecutionException ex = (ExecutionException) e;
270      assertThat(ex, allOf(isA(ExecutionException.class),
271        hasProperty("cause", isA(NoSuchColumnFamilyException.class))));
272    } else {
273      fail("Found unexpected Exception " + e);
274    }
275    assertTraceError(anyOf(
276      containsEntry(is(HBaseSemanticAttributes.EXCEPTION_TYPE),
277        endsWith(NoSuchColumnFamilyException.class.getName())),
278      allOf(
279        containsEntry(is(HBaseSemanticAttributes.EXCEPTION_TYPE),
280          endsWith(RemoteWithExtrasException.class.getName())),
281        containsEntry(is(HBaseSemanticAttributes.EXCEPTION_MESSAGE),
282          containsString(NoSuchColumnFamilyException.class.getName())))));
283  }
284
285  private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
286    int limit) throws Exception {
287    testScan(start, startInclusive, stop, stopInclusive, limit, -1);
288  }
289
290  private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
291    int limit, int closeAfter) throws Exception {
292    Scan scan =
293      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
294        .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
295    if (limit > 0) {
296      scan.setLimit(limit);
297    }
298    List<Result> results = doScan(scan, closeAfter);
299    int actualStart = startInclusive ? start : start + 1;
300    int actualStop = stopInclusive ? stop + 1 : stop;
301    int count = actualStop - actualStart;
302    if (limit > 0) {
303      count = Math.min(count, limit);
304    }
305    if (closeAfter > 0) {
306      count = Math.min(count, closeAfter);
307    }
308    assertEquals(count, results.size());
309    IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i));
310  }
311
312  private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
313    int limit) throws Exception {
314    Scan scan =
315      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
316        .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true);
317    if (limit > 0) {
318      scan.setLimit(limit);
319    }
320    List<Result> results = doScan(scan, -1);
321    int actualStart = startInclusive ? start : start - 1;
322    int actualStop = stopInclusive ? stop - 1 : stop;
323    int count = actualStart - actualStop;
324    if (limit > 0) {
325      count = Math.min(count, limit);
326    }
327    assertEquals(count, results.size());
328    IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i));
329  }
330
331  @TestTemplate
332  public void testScanWithStartKeyAndStopKey() throws Exception {
333    testScan(1, true, 998, false, -1); // from first region to last region
334    testScan(123, true, 345, true, -1);
335    testScan(234, true, 456, false, -1);
336    testScan(345, false, 567, true, -1);
337    testScan(456, false, 678, false, -1);
338  }
339
340  @TestTemplate
341  public void testReversedScanWithStartKeyAndStopKey() throws Exception {
342    testReversedScan(998, true, 1, false, -1); // from last region to first region
343    testReversedScan(543, true, 321, true, -1);
344    testReversedScan(654, true, 432, false, -1);
345    testReversedScan(765, false, 543, true, -1);
346    testReversedScan(876, false, 654, false, -1);
347  }
348
349  @TestTemplate
350  public void testScanAtRegionBoundary() throws Exception {
351    testScan(222, true, 333, true, -1);
352    testScan(333, true, 444, false, -1);
353    testScan(444, false, 555, true, -1);
354    testScan(555, false, 666, false, -1);
355  }
356
357  @TestTemplate
358  public void testReversedScanAtRegionBoundary() throws Exception {
359    testReversedScan(333, true, 222, true, -1);
360    testReversedScan(444, true, 333, false, -1);
361    testReversedScan(555, false, 444, true, -1);
362    testReversedScan(666, false, 555, false, -1);
363  }
364
365  @TestTemplate
366  public void testScanWithLimit() throws Exception {
367    testScan(1, true, 998, false, 900); // from first region to last region
368    testScan(123, true, 234, true, 100);
369    testScan(234, true, 456, false, 100);
370    testScan(345, false, 567, true, 100);
371    testScan(456, false, 678, false, 100);
372  }
373
374  @TestTemplate
375  public void testScanWithLimitGreaterThanActualCount() throws Exception {
376    testScan(1, true, 998, false, 1000); // from first region to last region
377    testScan(123, true, 345, true, 200);
378    testScan(234, true, 456, false, 200);
379    testScan(345, false, 567, true, 200);
380    testScan(456, false, 678, false, 200);
381  }
382
383  @TestTemplate
384  public void testReversedScanWithLimit() throws Exception {
385    testReversedScan(998, true, 1, false, 900); // from last region to first region
386    testReversedScan(543, true, 321, true, 100);
387    testReversedScan(654, true, 432, false, 100);
388    testReversedScan(765, false, 543, true, 100);
389    testReversedScan(876, false, 654, false, 100);
390  }
391
392  @TestTemplate
393  public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
394    testReversedScan(998, true, 1, false, 1000); // from last region to first region
395    testReversedScan(543, true, 321, true, 200);
396    testReversedScan(654, true, 432, false, 200);
397    testReversedScan(765, false, 543, true, 200);
398    testReversedScan(876, false, 654, false, 200);
399  }
400
401  @TestTemplate
402  public void testScanEndingEarly() throws Exception {
403    testScan(1, true, 998, false, 0, 900); // from first region to last region
404    testScan(123, true, 234, true, 0, 100);
405    testScan(234, true, 456, false, 0, 100);
406    testScan(345, false, 567, true, 0, 100);
407    testScan(456, false, 678, false, 0, 100);
408  }
409}