001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.allOf;
023import static org.hamcrest.Matchers.anyOf;
024import static org.hamcrest.Matchers.containsString;
025import static org.hamcrest.Matchers.endsWith;
026import static org.hamcrest.Matchers.hasItem;
027import static org.hamcrest.Matchers.hasProperty;
028import static org.hamcrest.Matchers.is;
029import static org.hamcrest.Matchers.isA;
030import static org.junit.Assert.assertEquals;
031import static org.junit.Assert.assertThrows;
032import static org.junit.Assert.fail;
033
034import io.opentelemetry.sdk.trace.data.SpanData;
035import java.io.IOException;
036import java.io.UncheckedIOException;
037import java.util.Arrays;
038import java.util.List;
039import java.util.Objects;
040import java.util.concurrent.ExecutionException;
041import java.util.concurrent.ForkJoinPool;
042import java.util.concurrent.TimeUnit;
043import java.util.function.Supplier;
044import java.util.stream.Collectors;
045import java.util.stream.IntStream;
046import java.util.stream.Stream;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.hbase.ConnectionRule;
049import org.apache.hadoop.hbase.HBaseTestingUtil;
050import org.apache.hadoop.hbase.MatcherPredicate;
051import org.apache.hadoop.hbase.MiniClusterRule;
052import org.apache.hadoop.hbase.StartTestingClusterOption;
053import org.apache.hadoop.hbase.TableName;
054import org.apache.hadoop.hbase.Waiter;
055import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
056import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
057import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
058import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule;
059import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule;
060import org.apache.hadoop.hbase.trace.TraceUtil;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.JVMClusterUtil;
063import org.apache.hadoop.hbase.util.Pair;
064import org.hamcrest.Matcher;
065import org.junit.ClassRule;
066import org.junit.Rule;
067import org.junit.Test;
068import org.junit.rules.ExternalResource;
069import org.junit.rules.RuleChain;
070import org.junit.rules.TestName;
071import org.junit.rules.TestRule;
072
073public abstract class AbstractTestAsyncTableScan {
074
075  protected static final OpenTelemetryClassRule OTEL_CLASS_RULE = OpenTelemetryClassRule.create();
076
077  private static Configuration createConfiguration() {
078    Configuration conf = new Configuration();
079    // Disable directory sharing to prevent race conditions when tests run in parallel.
080    // Each test instance gets its own isolated directories to avoid one test's tearDown()
081    // deleting directories another parallel test is still using.
082    conf.setBoolean("hbase.test.disable-directory-sharing", true);
083    return conf;
084  }
085
086  protected static final MiniClusterRule MINI_CLUSTER_RULE =
087    MiniClusterRule.newBuilder().setConfiguration(createConfiguration())
088      .setMiniClusterOption(StartTestingClusterOption.builder().numWorkers(3).build()).build();
089
090  protected static final ConnectionRule CONN_RULE =
091    ConnectionRule.createAsyncConnectionRule(MINI_CLUSTER_RULE::createAsyncConnection);
092
093  private static final class Setup extends ExternalResource {
094    @Override
095    protected void before() throws Throwable {
096      final HBaseTestingUtil testingUtil = MINI_CLUSTER_RULE.getTestingUtility();
097      final AsyncConnection conn = CONN_RULE.getAsyncConnection();
098
099      byte[][] splitKeys = new byte[8][];
100      for (int i = 111; i < 999; i += 111) {
101        splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
102      }
103      testingUtil.createTable(TABLE_NAME, FAMILY, splitKeys);
104      testingUtil.waitTableAvailable(TABLE_NAME);
105      conn.getTable(TABLE_NAME)
106        .putAll(IntStream.range(0, COUNT)
107          .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
108            .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i)))
109          .collect(Collectors.toList()))
110        .get();
111    }
112  }
113
114  @ClassRule
115  public static final TestRule classRule = RuleChain.outerRule(OTEL_CLASS_RULE)
116    .around(MINI_CLUSTER_RULE).around(CONN_RULE).around(new Setup());
117
118  @Rule
119  public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(OTEL_CLASS_RULE);
120
121  @Rule
122  public final TestName testName = new TestName();
123
124  protected static TableName TABLE_NAME = TableName.valueOf("async");
125
126  protected static byte[] FAMILY = Bytes.toBytes("cf");
127
128  protected static byte[] CQ1 = Bytes.toBytes("cq1");
129
130  protected static byte[] CQ2 = Bytes.toBytes("cq2");
131
132  protected static int COUNT = 1000;
133
134  private static Scan createNormalScan() {
135    return new Scan();
136  }
137
138  private static Scan createBatchScan() {
139    return new Scan().setBatch(1);
140  }
141
142  // set a small result size for testing flow control
143  private static Scan createSmallResultSizeScan() {
144    return new Scan().setMaxResultSize(1);
145  }
146
147  private static Scan createBatchSmallResultSizeScan() {
148    return new Scan().setBatch(1).setMaxResultSize(1);
149  }
150
151  private static AsyncTable<?> getRawTable() {
152    return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME);
153  }
154
155  private static AsyncTable<?> getTable() {
156    return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
157  }
158
159  private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
160    return Arrays.asList(Pair.newPair("normal", AbstractTestAsyncTableScan::createNormalScan),
161      Pair.newPair("batch", AbstractTestAsyncTableScan::createBatchScan),
162      Pair.newPair("smallResultSize", AbstractTestAsyncTableScan::createSmallResultSizeScan),
163      Pair.newPair("batchSmallResultSize",
164        AbstractTestAsyncTableScan::createBatchSmallResultSizeScan));
165  }
166
167  protected static List<Object[]> getScanCreatorParams() {
168    return getScanCreator().stream().map(p -> new Object[] { p.getFirst(), p.getSecond() })
169      .collect(Collectors.toList());
170  }
171
172  private static List<Pair<String, Supplier<AsyncTable<?>>>> getTableCreator() {
173    return Arrays.asList(Pair.newPair("raw", AbstractTestAsyncTableScan::getRawTable),
174      Pair.newPair("normal", AbstractTestAsyncTableScan::getTable));
175  }
176
177  protected static List<Object[]> getTableAndScanCreatorParams() {
178    List<Pair<String, Supplier<AsyncTable<?>>>> tableCreator = getTableCreator();
179    List<Pair<String, Supplier<Scan>>> scanCreator = getScanCreator();
180    return tableCreator.stream()
181      .flatMap(tp -> scanCreator.stream()
182        .map(sp -> new Object[] { tp.getFirst(), tp.getSecond(), sp.getFirst(), sp.getSecond() }))
183      .collect(Collectors.toList());
184  }
185
186  protected abstract Scan createScan();
187
188  protected abstract List<Result> doScan(Scan scan, int closeAfter) throws Exception;
189
190  /**
191   * Used by implementation classes to assert the correctness of spans produced under test.
192   */
193  protected abstract void assertTraceContinuity();
194
195  /**
196   * Used by implementation classes to assert the correctness of spans having errors.
197   */
198  protected abstract void
199    assertTraceError(final Matcher<io.opentelemetry.api.common.Attributes> exceptionMatcher);
200
201  protected final List<Result> convertFromBatchResult(List<Result> results) {
202    assertEquals(0, results.size() % 2);
203    return IntStream.range(0, results.size() / 2).mapToObj(i -> {
204      try {
205        return Result
206          .createCompleteResult(Arrays.asList(results.get(2 * i), results.get(2 * i + 1)));
207      } catch (IOException e) {
208        throw new UncheckedIOException(e);
209      }
210    }).collect(Collectors.toList());
211  }
212
213  protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
214    final Configuration conf = MINI_CLUSTER_RULE.getTestingUtility().getConfiguration();
215    Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
216      "Span for test failed to complete.", OTEL_CLASS_RULE::getSpans, hasItem(parentSpanMatcher)));
217  }
218
219  protected static Stream<SpanData> spanStream() {
220    return OTEL_CLASS_RULE.getSpans().stream().filter(Objects::nonNull);
221  }
222
223  @Test
224  public void testScanAll() throws Exception {
225    List<Result> results = doScan(createScan(), -1);
226    // make sure all scanners are closed at RS side
227    MINI_CLUSTER_RULE.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
228      .map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach(
229        rs -> assertEquals(
230          "The scanner count of " + rs.getServerName() + " is "
231            + rs.getRSRpcServices().getScannersCount(),
232          0, rs.getRSRpcServices().getScannersCount()));
233    assertEquals(COUNT, results.size());
234    IntStream.range(0, COUNT).forEach(i -> {
235      Result result = results.get(i);
236      assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
237      assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
238    });
239  }
240
241  private void assertResultEquals(Result result, int i) {
242    assertEquals(String.format("%03d", i), Bytes.toString(result.getRow()));
243    assertEquals(i, Bytes.toInt(result.getValue(FAMILY, CQ1)));
244    assertEquals(i * i, Bytes.toInt(result.getValue(FAMILY, CQ2)));
245  }
246
247  @Test
248  public void testReversedScanAll() throws Exception {
249    List<Result> results =
250      TraceUtil.trace(() -> doScan(createScan().setReversed(true), -1), testName.getMethodName());
251    assertEquals(COUNT, results.size());
252    IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1));
253    assertTraceContinuity();
254  }
255
256  @Test
257  public void testScanNoStopKey() throws Exception {
258    int start = 345;
259    List<Result> results = TraceUtil.trace(
260      () -> doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1),
261      testName.getMethodName());
262    assertEquals(COUNT - start, results.size());
263    IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i));
264    assertTraceContinuity();
265  }
266
267  @Test
268  public void testReverseScanNoStopKey() throws Exception {
269    int start = 765;
270    final Scan scan =
271      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true);
272    List<Result> results = TraceUtil.trace(() -> doScan(scan, -1), testName.getMethodName());
273    assertEquals(start + 1, results.size());
274    IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i));
275    assertTraceContinuity();
276  }
277
278  @Test
279  public void testScanWrongColumnFamily() {
280    final Exception e = assertThrows(Exception.class,
281      () -> TraceUtil.trace(
282        () -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1),
283        testName.getMethodName()));
284    // hamcrest generic enforcement for `anyOf` is a pain; skip it
285    // but -- don't we always unwrap ExecutionExceptions -- bug?
286    if (e instanceof NoSuchColumnFamilyException) {
287      final NoSuchColumnFamilyException ex = (NoSuchColumnFamilyException) e;
288      assertThat(ex, isA(NoSuchColumnFamilyException.class));
289    } else if (e instanceof ExecutionException) {
290      final ExecutionException ex = (ExecutionException) e;
291      assertThat(ex, allOf(isA(ExecutionException.class),
292        hasProperty("cause", isA(NoSuchColumnFamilyException.class))));
293    } else {
294      fail("Found unexpected Exception " + e);
295    }
296    assertTraceError(anyOf(
297      containsEntry(is(HBaseSemanticAttributes.EXCEPTION_TYPE),
298        endsWith(NoSuchColumnFamilyException.class.getName())),
299      allOf(
300        containsEntry(is(HBaseSemanticAttributes.EXCEPTION_TYPE),
301          endsWith(RemoteWithExtrasException.class.getName())),
302        containsEntry(is(HBaseSemanticAttributes.EXCEPTION_MESSAGE),
303          containsString(NoSuchColumnFamilyException.class.getName())))));
304  }
305
306  private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
307    int limit) throws Exception {
308    testScan(start, startInclusive, stop, stopInclusive, limit, -1);
309  }
310
311  private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
312    int limit, int closeAfter) throws Exception {
313    Scan scan =
314      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
315        .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive);
316    if (limit > 0) {
317      scan.setLimit(limit);
318    }
319    List<Result> results = doScan(scan, closeAfter);
320    int actualStart = startInclusive ? start : start + 1;
321    int actualStop = stopInclusive ? stop + 1 : stop;
322    int count = actualStop - actualStart;
323    if (limit > 0) {
324      count = Math.min(count, limit);
325    }
326    if (closeAfter > 0) {
327      count = Math.min(count, closeAfter);
328    }
329    assertEquals(count, results.size());
330    IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart + i));
331  }
332
333  private void testReversedScan(int start, boolean startInclusive, int stop, boolean stopInclusive,
334    int limit) throws Exception {
335    Scan scan =
336      createScan().withStartRow(Bytes.toBytes(String.format("%03d", start)), startInclusive)
337        .withStopRow(Bytes.toBytes(String.format("%03d", stop)), stopInclusive).setReversed(true);
338    if (limit > 0) {
339      scan.setLimit(limit);
340    }
341    List<Result> results = doScan(scan, -1);
342    int actualStart = startInclusive ? start : start - 1;
343    int actualStop = stopInclusive ? stop - 1 : stop;
344    int count = actualStart - actualStop;
345    if (limit > 0) {
346      count = Math.min(count, limit);
347    }
348    assertEquals(count, results.size());
349    IntStream.range(0, count).forEach(i -> assertResultEquals(results.get(i), actualStart - i));
350  }
351
352  @Test
353  public void testScanWithStartKeyAndStopKey() throws Exception {
354    testScan(1, true, 998, false, -1); // from first region to last region
355    testScan(123, true, 345, true, -1);
356    testScan(234, true, 456, false, -1);
357    testScan(345, false, 567, true, -1);
358    testScan(456, false, 678, false, -1);
359  }
360
361  @Test
362  public void testReversedScanWithStartKeyAndStopKey() throws Exception {
363    testReversedScan(998, true, 1, false, -1); // from last region to first region
364    testReversedScan(543, true, 321, true, -1);
365    testReversedScan(654, true, 432, false, -1);
366    testReversedScan(765, false, 543, true, -1);
367    testReversedScan(876, false, 654, false, -1);
368  }
369
370  @Test
371  public void testScanAtRegionBoundary() throws Exception {
372    testScan(222, true, 333, true, -1);
373    testScan(333, true, 444, false, -1);
374    testScan(444, false, 555, true, -1);
375    testScan(555, false, 666, false, -1);
376  }
377
378  @Test
379  public void testReversedScanAtRegionBoundary() throws Exception {
380    testReversedScan(333, true, 222, true, -1);
381    testReversedScan(444, true, 333, false, -1);
382    testReversedScan(555, false, 444, true, -1);
383    testReversedScan(666, false, 555, false, -1);
384  }
385
386  @Test
387  public void testScanWithLimit() throws Exception {
388    testScan(1, true, 998, false, 900); // from first region to last region
389    testScan(123, true, 234, true, 100);
390    testScan(234, true, 456, false, 100);
391    testScan(345, false, 567, true, 100);
392    testScan(456, false, 678, false, 100);
393  }
394
395  @Test
396  public void testScanWithLimitGreaterThanActualCount() throws Exception {
397    testScan(1, true, 998, false, 1000); // from first region to last region
398    testScan(123, true, 345, true, 200);
399    testScan(234, true, 456, false, 200);
400    testScan(345, false, 567, true, 200);
401    testScan(456, false, 678, false, 200);
402  }
403
404  @Test
405  public void testReversedScanWithLimit() throws Exception {
406    testReversedScan(998, true, 1, false, 900); // from last region to first region
407    testReversedScan(543, true, 321, true, 100);
408    testReversedScan(654, true, 432, false, 100);
409    testReversedScan(765, false, 543, true, 100);
410    testReversedScan(876, false, 654, false, 100);
411  }
412
413  @Test
414  public void testReversedScanWithLimitGreaterThanActualCount() throws Exception {
415    testReversedScan(998, true, 1, false, 1000); // from last region to first region
416    testReversedScan(543, true, 321, true, 200);
417    testReversedScan(654, true, 432, false, 200);
418    testReversedScan(765, false, 543, true, 200);
419    testReversedScan(876, false, 654, false, 200);
420  }
421
422  @Test
423  public void testScanEndingEarly() throws Exception {
424    testScan(1, true, 998, false, 0, 900); // from first region to last region
425    testScan(123, true, 234, true, 0, 100);
426    testScan(234, true, 456, false, 0, 100);
427    testScan(345, false, 567, true, 0, 100);
428    testScan(456, false, 678, false, 0, 100);
429  }
430}